Convert master_info rpc to new style result
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 class RPCFail(Exception):
50   """Class denoting RPC failure.
51
52   Its argument is the error message.
53
54   """
55
56 def _Fail(msg, *args, **kwargs):
57   """Log an error and the raise an RPCFail exception.
58
59   This exception is then handled specially in the ganeti daemon and
60   turned into a 'failed' return type. As such, this function is a
61   useful shortcut for logging the error and returning it to the master
62   daemon.
63
64   @type msg: string
65   @param msg: the text of the exception
66   @raise RPCFail
67
68   """
69   if args:
70     msg = msg % args
71   if "exc" in kwargs and kwargs["exc"]:
72     logging.exception(msg)
73   else:
74     logging.error(msg)
75   raise RPCFail(msg)
76
77
78 def _GetConfig():
79   """Simple wrapper to return a SimpleStore.
80
81   @rtype: L{ssconf.SimpleStore}
82   @return: a SimpleStore instance
83
84   """
85   return ssconf.SimpleStore()
86
87
88 def _GetSshRunner(cluster_name):
89   """Simple wrapper to return an SshRunner.
90
91   @type cluster_name: str
92   @param cluster_name: the cluster name, which is needed
93       by the SshRunner constructor
94   @rtype: L{ssh.SshRunner}
95   @return: an SshRunner instance
96
97   """
98   return ssh.SshRunner(cluster_name)
99
100
101 def _Decompress(data):
102   """Unpacks data compressed by the RPC client.
103
104   @type data: list or tuple
105   @param data: Data sent by RPC client
106   @rtype: str
107   @return: Decompressed data
108
109   """
110   assert isinstance(data, (list, tuple))
111   assert len(data) == 2
112   (encoding, content) = data
113   if encoding == constants.RPC_ENCODING_NONE:
114     return content
115   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116     return zlib.decompress(base64.b64decode(content))
117   else:
118     raise AssertionError("Unknown data encoding")
119
120
121 def _CleanDirectory(path, exclude=None):
122   """Removes all regular files in a directory.
123
124   @type path: str
125   @param path: the directory to clean
126   @type exclude: list
127   @param exclude: list of files to be excluded, defaults
128       to the empty list
129
130   """
131   if not os.path.isdir(path):
132     return
133   if exclude is None:
134     exclude = []
135   else:
136     # Normalize excluded paths
137     exclude = [os.path.normpath(i) for i in exclude]
138
139   for rel_name in utils.ListVisibleFiles(path):
140     full_name = os.path.normpath(os.path.join(path, rel_name))
141     if full_name in exclude:
142       continue
143     if os.path.isfile(full_name) and not os.path.islink(full_name):
144       utils.RemoveFile(full_name)
145
146
147 def JobQueuePurge():
148   """Removes job queue files and archived jobs.
149
150   @rtype: None
151
152   """
153   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
155
156
157 def GetMasterInfo():
158   """Returns master information.
159
160   This is an utility function to compute master information, either
161   for consumption here or from the node daemon.
162
163   @rtype: tuple
164   @return: True, (master_netdev, master_ip, master_name) in case of success
165   @raise RPCFail: in case of errors
166
167   """
168   try:
169     cfg = _GetConfig()
170     master_netdev = cfg.GetMasterNetdev()
171     master_ip = cfg.GetMasterIP()
172     master_node = cfg.GetMasterNode()
173   except errors.ConfigurationError, err:
174     _Fail("Cluster configuration incomplete", exc=True)
175   return True, (master_netdev, master_ip, master_node)
176
177
178 def StartMaster(start_daemons):
179   """Activate local node as master node.
180
181   The function will always try activate the IP address of the master
182   (unless someone else has it). It will also start the master daemons,
183   based on the start_daemons parameter.
184
185   @type start_daemons: boolean
186   @param start_daemons: whther to also start the master
187       daemons (ganeti-masterd and ganeti-rapi)
188   @rtype: None
189
190   """
191   # GetMasterInfo will raise an exception if not able to return data
192   master_netdev, master_ip, _ = GetMasterInfo()[1]
193
194   payload = []
195   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
196     if utils.OwnIpAddress(master_ip):
197       # we already have the ip:
198       logging.debug("Master IP already configured, doing nothing")
199     else:
200       msg = "Someone else has the master ip, not activating"
201       logging.error(msg)
202       payload.append(msg)
203   else:
204     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
205                            "dev", master_netdev, "label",
206                            "%s:0" % master_netdev])
207     if result.failed:
208       msg = "Can't activate master IP: %s" % result.output
209       logging.error(msg)
210       payload.append(msg)
211
212     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
213                            "-s", master_ip, master_ip])
214     # we'll ignore the exit code of arping
215
216   # and now start the master and rapi daemons
217   if start_daemons:
218     for daemon in 'ganeti-masterd', 'ganeti-rapi':
219       result = utils.RunCmd([daemon])
220       if result.failed:
221         msg = "Can't start daemon %s: %s" % (daemon, result.output)
222         logging.error(msg)
223         payload.append(msg)
224
225   return not payload, "; ".join(payload)
226
227
228 def StopMaster(stop_daemons):
229   """Deactivate this node as master.
230
231   The function will always try to deactivate the IP address of the
232   master. It will also stop the master daemons depending on the
233   stop_daemons parameter.
234
235   @type stop_daemons: boolean
236   @param stop_daemons: whether to also stop the master daemons
237       (ganeti-masterd and ganeti-rapi)
238   @rtype: None
239
240   """
241   # TODO: log and report back to the caller the error failures; we
242   # need to decide in which case we fail the RPC for this
243
244   # GetMasterInfo will raise an exception if not able to return data
245   master_netdev, master_ip, _ = GetMasterInfo()[1]
246
247   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
248                          "dev", master_netdev])
249   if result.failed:
250     logging.error("Can't remove the master IP, error: %s", result.output)
251     # but otherwise ignore the failure
252
253   if stop_daemons:
254     # stop/kill the rapi and the master daemon
255     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
256       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
257
258   return True, None
259
260
261 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
262   """Joins this node to the cluster.
263
264   This does the following:
265       - updates the hostkeys of the machine (rsa and dsa)
266       - adds the ssh private key to the user
267       - adds the ssh public key to the users' authorized_keys file
268
269   @type dsa: str
270   @param dsa: the DSA private key to write
271   @type dsapub: str
272   @param dsapub: the DSA public key to write
273   @type rsa: str
274   @param rsa: the RSA private key to write
275   @type rsapub: str
276   @param rsapub: the RSA public key to write
277   @type sshkey: str
278   @param sshkey: the SSH private key to write
279   @type sshpub: str
280   @param sshpub: the SSH public key to write
281   @rtype: boolean
282   @return: the success of the operation
283
284   """
285   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
286                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
287                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
288                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
289   for name, content, mode in sshd_keys:
290     utils.WriteFile(name, data=content, mode=mode)
291
292   try:
293     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
294                                                     mkdir=True)
295   except errors.OpExecError, err:
296     _Fail("Error while processing user ssh files: %s", err, exc=True)
297
298   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
299     utils.WriteFile(name, data=content, mode=0600)
300
301   utils.AddAuthorizedKey(auth_keys, sshpub)
302
303   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
304
305   return (True, "Node added successfully")
306
307
308 def LeaveCluster():
309   """Cleans up and remove the current node.
310
311   This function cleans up and prepares the current node to be removed
312   from the cluster.
313
314   If processing is successful, then it raises an
315   L{errors.QuitGanetiException} which is used as a special case to
316   shutdown the node daemon.
317
318   """
319   _CleanDirectory(constants.DATA_DIR)
320   JobQueuePurge()
321
322   try:
323     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
324
325     f = open(pub_key, 'r')
326     try:
327       utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
328     finally:
329       f.close()
330
331     utils.RemoveFile(priv_key)
332     utils.RemoveFile(pub_key)
333   except errors.OpExecError:
334     logging.exception("Error while processing ssh files")
335
336   # Raise a custom exception (handled in ganeti-noded)
337   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
338
339
340 def GetNodeInfo(vgname, hypervisor_type):
341   """Gives back a hash with different informations about the node.
342
343   @type vgname: C{string}
344   @param vgname: the name of the volume group to ask for disk space information
345   @type hypervisor_type: C{str}
346   @param hypervisor_type: the name of the hypervisor to ask for
347       memory information
348   @rtype: C{dict}
349   @return: dictionary with the following keys:
350       - vg_size is the size of the configured volume group in MiB
351       - vg_free is the free size of the volume group in MiB
352       - memory_dom0 is the memory allocated for domain0 in MiB
353       - memory_free is the currently available (free) ram in MiB
354       - memory_total is the total number of ram in MiB
355
356   """
357   outputarray = {}
358   vginfo = _GetVGInfo(vgname)
359   outputarray['vg_size'] = vginfo['vg_size']
360   outputarray['vg_free'] = vginfo['vg_free']
361
362   hyper = hypervisor.GetHypervisor(hypervisor_type)
363   hyp_info = hyper.GetNodeInfo()
364   if hyp_info is not None:
365     outputarray.update(hyp_info)
366
367   f = open("/proc/sys/kernel/random/boot_id", 'r')
368   try:
369     outputarray["bootid"] = f.read(128).rstrip("\n")
370   finally:
371     f.close()
372
373   return True, outputarray
374
375
376 def VerifyNode(what, cluster_name):
377   """Verify the status of the local node.
378
379   Based on the input L{what} parameter, various checks are done on the
380   local node.
381
382   If the I{filelist} key is present, this list of
383   files is checksummed and the file/checksum pairs are returned.
384
385   If the I{nodelist} key is present, we check that we have
386   connectivity via ssh with the target nodes (and check the hostname
387   report).
388
389   If the I{node-net-test} key is present, we check that we have
390   connectivity to the given nodes via both primary IP and, if
391   applicable, secondary IPs.
392
393   @type what: C{dict}
394   @param what: a dictionary of things to check:
395       - filelist: list of files for which to compute checksums
396       - nodelist: list of nodes we should check ssh communication with
397       - node-net-test: list of nodes we should check node daemon port
398         connectivity with
399       - hypervisor: list with hypervisors to run the verify for
400   @rtype: dict
401   @return: a dictionary with the same keys as the input dict, and
402       values representing the result of the checks
403
404   """
405   result = {}
406
407   if constants.NV_HYPERVISOR in what:
408     result[constants.NV_HYPERVISOR] = tmp = {}
409     for hv_name in what[constants.NV_HYPERVISOR]:
410       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
411
412   if constants.NV_FILELIST in what:
413     result[constants.NV_FILELIST] = utils.FingerprintFiles(
414       what[constants.NV_FILELIST])
415
416   if constants.NV_NODELIST in what:
417     result[constants.NV_NODELIST] = tmp = {}
418     random.shuffle(what[constants.NV_NODELIST])
419     for node in what[constants.NV_NODELIST]:
420       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
421       if not success:
422         tmp[node] = message
423
424   if constants.NV_NODENETTEST in what:
425     result[constants.NV_NODENETTEST] = tmp = {}
426     my_name = utils.HostInfo().name
427     my_pip = my_sip = None
428     for name, pip, sip in what[constants.NV_NODENETTEST]:
429       if name == my_name:
430         my_pip = pip
431         my_sip = sip
432         break
433     if not my_pip:
434       tmp[my_name] = ("Can't find my own primary/secondary IP"
435                       " in the node list")
436     else:
437       port = utils.GetNodeDaemonPort()
438       for name, pip, sip in what[constants.NV_NODENETTEST]:
439         fail = []
440         if not utils.TcpPing(pip, port, source=my_pip):
441           fail.append("primary")
442         if sip != pip:
443           if not utils.TcpPing(sip, port, source=my_sip):
444             fail.append("secondary")
445         if fail:
446           tmp[name] = ("failure using the %s interface(s)" %
447                        " and ".join(fail))
448
449   if constants.NV_LVLIST in what:
450     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
451
452   if constants.NV_INSTANCELIST in what:
453     result[constants.NV_INSTANCELIST] = GetInstanceList(
454       what[constants.NV_INSTANCELIST])
455
456   if constants.NV_VGLIST in what:
457     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
458
459   if constants.NV_VERSION in what:
460     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
461                                     constants.RELEASE_VERSION)
462
463   if constants.NV_HVINFO in what:
464     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
465     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
466
467   if constants.NV_DRBDLIST in what:
468     try:
469       used_minors = bdev.DRBD8.GetUsedDevs().keys()
470     except errors.BlockDeviceError, err:
471       logging.warning("Can't get used minors list", exc_info=True)
472       used_minors = str(err)
473     result[constants.NV_DRBDLIST] = used_minors
474
475   return True, result
476
477
478 def GetVolumeList(vg_name):
479   """Compute list of logical volumes and their size.
480
481   @type vg_name: str
482   @param vg_name: the volume group whose LVs we should list
483   @rtype: dict
484   @return:
485       dictionary of all partions (key) with value being a tuple of
486       their size (in MiB), inactive and online status::
487
488         {'test1': ('20.06', True, True)}
489
490       in case of errors, a string is returned with the error
491       details.
492
493   """
494   lvs = {}
495   sep = '|'
496   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
497                          "--separator=%s" % sep,
498                          "-olv_name,lv_size,lv_attr", vg_name])
499   if result.failed:
500     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
501
502   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
503   for line in result.stdout.splitlines():
504     line = line.strip()
505     match = valid_line_re.match(line)
506     if not match:
507       logging.error("Invalid line returned from lvs output: '%s'", line)
508       continue
509     name, size, attr = match.groups()
510     inactive = attr[4] == '-'
511     online = attr[5] == 'o'
512     lvs[name] = (size, inactive, online)
513
514   return lvs
515
516
517 def ListVolumeGroups():
518   """List the volume groups and their size.
519
520   @rtype: dict
521   @return: dictionary with keys volume name and values the
522       size of the volume
523
524   """
525   return True, utils.ListVolumeGroups()
526
527
528 def NodeVolumes():
529   """List all volumes on this node.
530
531   @rtype: list
532   @return:
533     A list of dictionaries, each having four keys:
534       - name: the logical volume name,
535       - size: the size of the logical volume
536       - dev: the physical device on which the LV lives
537       - vg: the volume group to which it belongs
538
539     In case of errors, we return an empty list and log the
540     error.
541
542     Note that since a logical volume can live on multiple physical
543     volumes, the resulting list might include a logical volume
544     multiple times.
545
546   """
547   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
548                          "--separator=|",
549                          "--options=lv_name,lv_size,devices,vg_name"])
550   if result.failed:
551     _Fail("Failed to list logical volumes, lvs output: %s",
552           result.output)
553
554   def parse_dev(dev):
555     if '(' in dev:
556       return dev.split('(')[0]
557     else:
558       return dev
559
560   def map_line(line):
561     return {
562       'name': line[0].strip(),
563       'size': line[1].strip(),
564       'dev': parse_dev(line[2].strip()),
565       'vg': line[3].strip(),
566     }
567
568   return True, [map_line(line.split('|'))
569                 for line in result.stdout.splitlines()
570                 if line.count('|') >= 3]
571
572
573 def BridgesExist(bridges_list):
574   """Check if a list of bridges exist on the current node.
575
576   @rtype: boolean
577   @return: C{True} if all of them exist, C{False} otherwise
578
579   """
580   missing = []
581   for bridge in bridges_list:
582     if not utils.BridgeExists(bridge):
583       missing.append(bridge)
584
585   if missing:
586     return False, "Missing bridges %s" % (", ".join(missing),)
587
588   return True, None
589
590
591 def GetInstanceList(hypervisor_list):
592   """Provides a list of instances.
593
594   @type hypervisor_list: list
595   @param hypervisor_list: the list of hypervisors to query information
596
597   @rtype: list
598   @return: a list of all running instances on the current node
599     - instance1.example.com
600     - instance2.example.com
601
602   """
603   results = []
604   for hname in hypervisor_list:
605     try:
606       names = hypervisor.GetHypervisor(hname).ListInstances()
607       results.extend(names)
608     except errors.HypervisorError, err:
609       _Fail("Error enumerating instances (hypervisor %s): %s",
610             hname, err, exc=True)
611
612   return results
613
614
615 def GetInstanceInfo(instance, hname):
616   """Gives back the informations about an instance as a dictionary.
617
618   @type instance: string
619   @param instance: the instance name
620   @type hname: string
621   @param hname: the hypervisor type of the instance
622
623   @rtype: dict
624   @return: dictionary with the following keys:
625       - memory: memory size of instance (int)
626       - state: xen state of instance (string)
627       - time: cpu time of instance (float)
628
629   """
630   output = {}
631
632   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
633   if iinfo is not None:
634     output['memory'] = iinfo[2]
635     output['state'] = iinfo[4]
636     output['time'] = iinfo[5]
637
638   return True, output
639
640
641 def GetInstanceMigratable(instance):
642   """Gives whether an instance can be migrated.
643
644   @type instance: L{objects.Instance}
645   @param instance: object representing the instance to be checked.
646
647   @rtype: tuple
648   @return: tuple of (result, description) where:
649       - result: whether the instance can be migrated or not
650       - description: a description of the issue, if relevant
651
652   """
653   hyper = hypervisor.GetHypervisor(instance.hypervisor)
654   if instance.name not in hyper.ListInstances():
655     return (False, 'not running')
656
657   for idx in range(len(instance.disks)):
658     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
659     if not os.path.islink(link_name):
660       return (False, 'not restarted since ganeti 1.2.5')
661
662   return (True, '')
663
664
665 def GetAllInstancesInfo(hypervisor_list):
666   """Gather data about all instances.
667
668   This is the equivalent of L{GetInstanceInfo}, except that it
669   computes data for all instances at once, thus being faster if one
670   needs data about more than one instance.
671
672   @type hypervisor_list: list
673   @param hypervisor_list: list of hypervisors to query for instance data
674
675   @rtype: dict
676   @return: dictionary of instance: data, with data having the following keys:
677       - memory: memory size of instance (int)
678       - state: xen state of instance (string)
679       - time: cpu time of instance (float)
680       - vcpus: the number of vcpus
681
682   """
683   output = {}
684
685   for hname in hypervisor_list:
686     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
687     if iinfo:
688       for name, inst_id, memory, vcpus, state, times in iinfo:
689         value = {
690           'memory': memory,
691           'vcpus': vcpus,
692           'state': state,
693           'time': times,
694           }
695         if name in output:
696           # we only check static parameters, like memory and vcpus,
697           # and not state and time which can change between the
698           # invocations of the different hypervisors
699           for key in 'memory', 'vcpus':
700             if value[key] != output[name][key]:
701               _Fail("Instance %s is running twice"
702                     " with different parameters", name)
703         output[name] = value
704
705   return True, output
706
707
708 def InstanceOsAdd(instance, reinstall):
709   """Add an OS to an instance.
710
711   @type instance: L{objects.Instance}
712   @param instance: Instance whose OS is to be installed
713   @type reinstall: boolean
714   @param reinstall: whether this is an instance reinstall
715   @rtype: boolean
716   @return: the success of the operation
717
718   """
719   try:
720     inst_os = OSFromDisk(instance.os)
721   except errors.InvalidOS, err:
722     os_name, os_dir, os_err = err.args
723     if os_dir is None:
724       return (False, "Can't find OS '%s': %s" % (os_name, os_err))
725     else:
726       return (False, "Error parsing OS '%s' in directory %s: %s" %
727               (os_name, os_dir, os_err))
728
729   create_env = OSEnvironment(instance)
730   if reinstall:
731     create_env['INSTANCE_REINSTALL'] = "1"
732
733   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
734                                      instance.name, int(time.time()))
735
736   result = utils.RunCmd([inst_os.create_script], env=create_env,
737                         cwd=inst_os.path, output=logfile,)
738   if result.failed:
739     logging.error("os create command '%s' returned error: %s, logfile: %s,"
740                   " output: %s", result.cmd, result.fail_reason, logfile,
741                   result.output)
742     lines = [utils.SafeEncode(val)
743              for val in utils.TailFile(logfile, lines=20)]
744     return (False, "OS create script failed (%s), last lines in the"
745             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
746
747   return (True, "Successfully installed")
748
749
750 def RunRenameInstance(instance, old_name):
751   """Run the OS rename script for an instance.
752
753   @type instance: L{objects.Instance}
754   @param instance: Instance whose OS is to be installed
755   @type old_name: string
756   @param old_name: previous instance name
757   @rtype: boolean
758   @return: the success of the operation
759
760   """
761   inst_os = OSFromDisk(instance.os)
762
763   rename_env = OSEnvironment(instance)
764   rename_env['OLD_INSTANCE_NAME'] = old_name
765
766   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
767                                            old_name,
768                                            instance.name, int(time.time()))
769
770   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
771                         cwd=inst_os.path, output=logfile)
772
773   if result.failed:
774     logging.error("os create command '%s' returned error: %s output: %s",
775                   result.cmd, result.fail_reason, result.output)
776     lines = [utils.SafeEncode(val)
777              for val in utils.TailFile(logfile, lines=20)]
778     return (False, "OS rename script failed (%s), last lines in the"
779             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
780
781   return (True, "Rename successful")
782
783
784 def _GetVGInfo(vg_name):
785   """Get informations about the volume group.
786
787   @type vg_name: str
788   @param vg_name: the volume group which we query
789   @rtype: dict
790   @return:
791     A dictionary with the following keys:
792       - C{vg_size} is the total size of the volume group in MiB
793       - C{vg_free} is the free size of the volume group in MiB
794       - C{pv_count} are the number of physical disks in that VG
795
796     If an error occurs during gathering of data, we return the same dict
797     with keys all set to None.
798
799   """
800   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
801
802   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
803                          "--nosuffix", "--units=m", "--separator=:", vg_name])
804
805   if retval.failed:
806     logging.error("volume group %s not present", vg_name)
807     return retdic
808   valarr = retval.stdout.strip().rstrip(':').split(':')
809   if len(valarr) == 3:
810     try:
811       retdic = {
812         "vg_size": int(round(float(valarr[0]), 0)),
813         "vg_free": int(round(float(valarr[1]), 0)),
814         "pv_count": int(valarr[2]),
815         }
816     except ValueError, err:
817       logging.exception("Fail to parse vgs output")
818   else:
819     logging.error("vgs output has the wrong number of fields (expected"
820                   " three): %s", str(valarr))
821   return retdic
822
823
824 def _GetBlockDevSymlinkPath(instance_name, idx):
825   return os.path.join(constants.DISK_LINKS_DIR,
826                       "%s:%d" % (instance_name, idx))
827
828
829 def _SymlinkBlockDev(instance_name, device_path, idx):
830   """Set up symlinks to a instance's block device.
831
832   This is an auxiliary function run when an instance is start (on the primary
833   node) or when an instance is migrated (on the target node).
834
835
836   @param instance_name: the name of the target instance
837   @param device_path: path of the physical block device, on the node
838   @param idx: the disk index
839   @return: absolute path to the disk's symlink
840
841   """
842   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
843   try:
844     os.symlink(device_path, link_name)
845   except OSError, err:
846     if err.errno == errno.EEXIST:
847       if (not os.path.islink(link_name) or
848           os.readlink(link_name) != device_path):
849         os.remove(link_name)
850         os.symlink(device_path, link_name)
851     else:
852       raise
853
854   return link_name
855
856
857 def _RemoveBlockDevLinks(instance_name, disks):
858   """Remove the block device symlinks belonging to the given instance.
859
860   """
861   for idx, disk in enumerate(disks):
862     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
863     if os.path.islink(link_name):
864       try:
865         os.remove(link_name)
866       except OSError:
867         logging.exception("Can't remove symlink '%s'", link_name)
868
869
870 def _GatherAndLinkBlockDevs(instance):
871   """Set up an instance's block device(s).
872
873   This is run on the primary node at instance startup. The block
874   devices must be already assembled.
875
876   @type instance: L{objects.Instance}
877   @param instance: the instance whose disks we shoul assemble
878   @rtype: list
879   @return: list of (disk_object, device_path)
880
881   """
882   block_devices = []
883   for idx, disk in enumerate(instance.disks):
884     device = _RecursiveFindBD(disk)
885     if device is None:
886       raise errors.BlockDeviceError("Block device '%s' is not set up." %
887                                     str(disk))
888     device.Open()
889     try:
890       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
891     except OSError, e:
892       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
893                                     e.strerror)
894
895     block_devices.append((disk, link_name))
896
897   return block_devices
898
899
900 def StartInstance(instance):
901   """Start an instance.
902
903   @type instance: L{objects.Instance}
904   @param instance: the instance object
905   @rtype: boolean
906   @return: whether the startup was successful or not
907
908   """
909   running_instances = GetInstanceList([instance.hypervisor])
910
911   if instance.name in running_instances:
912     return (True, "Already running")
913
914   try:
915     block_devices = _GatherAndLinkBlockDevs(instance)
916     hyper = hypervisor.GetHypervisor(instance.hypervisor)
917     hyper.StartInstance(instance, block_devices)
918   except errors.BlockDeviceError, err:
919     _Fail("Block device error: %s", err, exc=True)
920   except errors.HypervisorError, err:
921     _RemoveBlockDevLinks(instance.name, instance.disks)
922     _Fail("Hypervisor error: %s", err, exc=True)
923
924   return (True, "Instance started successfully")
925
926
927 def InstanceShutdown(instance):
928   """Shut an instance down.
929
930   @note: this functions uses polling with a hardcoded timeout.
931
932   @type instance: L{objects.Instance}
933   @param instance: the instance object
934   @rtype: boolean
935   @return: whether the startup was successful or not
936
937   """
938   hv_name = instance.hypervisor
939   running_instances = GetInstanceList([hv_name])
940
941   if instance.name not in running_instances:
942     return (True, "Instance already stopped")
943
944   hyper = hypervisor.GetHypervisor(hv_name)
945   try:
946     hyper.StopInstance(instance)
947   except errors.HypervisorError, err:
948     _Fail("Failed to stop instance %s: %s", instance.name, err)
949
950   # test every 10secs for 2min
951
952   time.sleep(1)
953   for dummy in range(11):
954     if instance.name not in GetInstanceList([hv_name]):
955       break
956     time.sleep(10)
957   else:
958     # the shutdown did not succeed
959     logging.error("Shutdown of '%s' unsuccessful, using destroy",
960                   instance.name)
961
962     try:
963       hyper.StopInstance(instance, force=True)
964     except errors.HypervisorError, err:
965       _Fail("Failed to force stop instance %s: %s", instance.name, err)
966
967     time.sleep(1)
968     if instance.name in GetInstanceList([hv_name]):
969       _Fail("Could not shutdown instance %s even by destroy", instance.name)
970
971   _RemoveBlockDevLinks(instance.name, instance.disks)
972
973   return (True, "Instance has been shutdown successfully")
974
975
976 def InstanceReboot(instance, reboot_type):
977   """Reboot an instance.
978
979   @type instance: L{objects.Instance}
980   @param instance: the instance object to reboot
981   @type reboot_type: str
982   @param reboot_type: the type of reboot, one the following
983     constants:
984       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
985         instance OS, do not recreate the VM
986       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
987         restart the VM (at the hypervisor level)
988       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
989         is not accepted here, since that mode is handled
990         differently
991   @rtype: boolean
992   @return: the success of the operation
993
994   """
995   running_instances = GetInstanceList([instance.hypervisor])
996
997   if instance.name not in running_instances:
998     _Fail("Cannot reboot instance %s that is not running", instance.name)
999
1000   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1001   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1002     try:
1003       hyper.RebootInstance(instance)
1004     except errors.HypervisorError, err:
1005       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1006   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1007     try:
1008       stop_result = InstanceShutdown(instance)
1009       if not stop_result[0]:
1010         return stop_result
1011       return StartInstance(instance)
1012     except errors.HypervisorError, err:
1013       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1014   else:
1015     _Fail("Invalid reboot_type received: %s", reboot_type)
1016
1017   return (True, "Reboot successful")
1018
1019
1020 def MigrationInfo(instance):
1021   """Gather information about an instance to be migrated.
1022
1023   @type instance: L{objects.Instance}
1024   @param instance: the instance definition
1025
1026   """
1027   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1028   try:
1029     info = hyper.MigrationInfo(instance)
1030   except errors.HypervisorError, err:
1031     _Fail("Failed to fetch migration information: %s", err, exc=True)
1032   return (True, info)
1033
1034
1035 def AcceptInstance(instance, info, target):
1036   """Prepare the node to accept an instance.
1037
1038   @type instance: L{objects.Instance}
1039   @param instance: the instance definition
1040   @type info: string/data (opaque)
1041   @param info: migration information, from the source node
1042   @type target: string
1043   @param target: target host (usually ip), on this node
1044
1045   """
1046   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1047   try:
1048     hyper.AcceptInstance(instance, info, target)
1049   except errors.HypervisorError, err:
1050     _Fail("Failed to accept instance: %s", err, exc=True)
1051   return (True, "Accept successfull")
1052
1053
1054 def FinalizeMigration(instance, info, success):
1055   """Finalize any preparation to accept an instance.
1056
1057   @type instance: L{objects.Instance}
1058   @param instance: the instance definition
1059   @type info: string/data (opaque)
1060   @param info: migration information, from the source node
1061   @type success: boolean
1062   @param success: whether the migration was a success or a failure
1063
1064   """
1065   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1066   try:
1067     hyper.FinalizeMigration(instance, info, success)
1068   except errors.HypervisorError, err:
1069     _Fail("Failed to finalize migration: %s", err, exc=True)
1070   return (True, "Migration Finalized")
1071
1072
1073 def MigrateInstance(instance, target, live):
1074   """Migrates an instance to another node.
1075
1076   @type instance: L{objects.Instance}
1077   @param instance: the instance definition
1078   @type target: string
1079   @param target: the target node name
1080   @type live: boolean
1081   @param live: whether the migration should be done live or not (the
1082       interpretation of this parameter is left to the hypervisor)
1083   @rtype: tuple
1084   @return: a tuple of (success, msg) where:
1085       - succes is a boolean denoting the success/failure of the operation
1086       - msg is a string with details in case of failure
1087
1088   """
1089   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1090
1091   try:
1092     hyper.MigrateInstance(instance.name, target, live)
1093   except errors.HypervisorError, err:
1094     _Fail("Failed to migrate instance: %s", err, exc=True)
1095   return (True, "Migration successfull")
1096
1097
1098 def BlockdevCreate(disk, size, owner, on_primary, info):
1099   """Creates a block device for an instance.
1100
1101   @type disk: L{objects.Disk}
1102   @param disk: the object describing the disk we should create
1103   @type size: int
1104   @param size: the size of the physical underlying device, in MiB
1105   @type owner: str
1106   @param owner: the name of the instance for which disk is created,
1107       used for device cache data
1108   @type on_primary: boolean
1109   @param on_primary:  indicates if it is the primary node or not
1110   @type info: string
1111   @param info: string that will be sent to the physical device
1112       creation, used for example to set (LVM) tags on LVs
1113
1114   @return: the new unique_id of the device (this can sometime be
1115       computed only after creation), or None. On secondary nodes,
1116       it's not required to return anything.
1117
1118   """
1119   clist = []
1120   if disk.children:
1121     for child in disk.children:
1122       try:
1123         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1124       except errors.BlockDeviceError, err:
1125         _Fail("Can't assemble device %s: %s", child, err)
1126       if on_primary or disk.AssembleOnSecondary():
1127         # we need the children open in case the device itself has to
1128         # be assembled
1129         try:
1130           crdev.Open()
1131         except errors.BlockDeviceError, err:
1132           _Fail("Can't make child '%s' read-write: %s", child, err)
1133       clist.append(crdev)
1134
1135   try:
1136     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1137   except errors.BlockDeviceError, err:
1138     _Fail("Can't create block device: %s", err)
1139
1140   if on_primary or disk.AssembleOnSecondary():
1141     try:
1142       device.Assemble()
1143     except errors.BlockDeviceError, err:
1144       _Fail("Can't assemble device after creation, unusual event: %s", err)
1145     device.SetSyncSpeed(constants.SYNC_SPEED)
1146     if on_primary or disk.OpenOnSecondary():
1147       try:
1148         device.Open(force=True)
1149       except errors.BlockDeviceError, err:
1150         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1151     DevCacheManager.UpdateCache(device.dev_path, owner,
1152                                 on_primary, disk.iv_name)
1153
1154   device.SetInfo(info)
1155
1156   physical_id = device.unique_id
1157   return True, physical_id
1158
1159
1160 def BlockdevRemove(disk):
1161   """Remove a block device.
1162
1163   @note: This is intended to be called recursively.
1164
1165   @type disk: L{objects.Disk}
1166   @param disk: the disk object we should remove
1167   @rtype: boolean
1168   @return: the success of the operation
1169
1170   """
1171   msgs = []
1172   result = True
1173   try:
1174     rdev = _RecursiveFindBD(disk)
1175   except errors.BlockDeviceError, err:
1176     # probably can't attach
1177     logging.info("Can't attach to device %s in remove", disk)
1178     rdev = None
1179   if rdev is not None:
1180     r_path = rdev.dev_path
1181     try:
1182       rdev.Remove()
1183     except errors.BlockDeviceError, err:
1184       msgs.append(str(err))
1185       result = False
1186     if result:
1187       DevCacheManager.RemoveCache(r_path)
1188
1189   if disk.children:
1190     for child in disk.children:
1191       c_status, c_msg = BlockdevRemove(child)
1192       result = result and c_status
1193       if c_msg: # not an empty message
1194         msgs.append(c_msg)
1195
1196   return (result, "; ".join(msgs))
1197
1198
1199 def _RecursiveAssembleBD(disk, owner, as_primary):
1200   """Activate a block device for an instance.
1201
1202   This is run on the primary and secondary nodes for an instance.
1203
1204   @note: this function is called recursively.
1205
1206   @type disk: L{objects.Disk}
1207   @param disk: the disk we try to assemble
1208   @type owner: str
1209   @param owner: the name of the instance which owns the disk
1210   @type as_primary: boolean
1211   @param as_primary: if we should make the block device
1212       read/write
1213
1214   @return: the assembled device or None (in case no device
1215       was assembled)
1216   @raise errors.BlockDeviceError: in case there is an error
1217       during the activation of the children or the device
1218       itself
1219
1220   """
1221   children = []
1222   if disk.children:
1223     mcn = disk.ChildrenNeeded()
1224     if mcn == -1:
1225       mcn = 0 # max number of Nones allowed
1226     else:
1227       mcn = len(disk.children) - mcn # max number of Nones
1228     for chld_disk in disk.children:
1229       try:
1230         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1231       except errors.BlockDeviceError, err:
1232         if children.count(None) >= mcn:
1233           raise
1234         cdev = None
1235         logging.error("Error in child activation (but continuing): %s",
1236                       str(err))
1237       children.append(cdev)
1238
1239   if as_primary or disk.AssembleOnSecondary():
1240     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1241     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1242     result = r_dev
1243     if as_primary or disk.OpenOnSecondary():
1244       r_dev.Open()
1245     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1246                                 as_primary, disk.iv_name)
1247
1248   else:
1249     result = True
1250   return result
1251
1252
1253 def BlockdevAssemble(disk, owner, as_primary):
1254   """Activate a block device for an instance.
1255
1256   This is a wrapper over _RecursiveAssembleBD.
1257
1258   @rtype: str or boolean
1259   @return: a C{/dev/...} path for primary nodes, and
1260       C{True} for secondary nodes
1261
1262   """
1263   status = True
1264   result = "no error information"
1265   try:
1266     result = _RecursiveAssembleBD(disk, owner, as_primary)
1267     if isinstance(result, bdev.BlockDev):
1268       result = result.dev_path
1269   except errors.BlockDeviceError, err:
1270     result = "Error while assembling disk: %s" % str(err)
1271     status = False
1272   return (status, result)
1273
1274
1275 def BlockdevShutdown(disk):
1276   """Shut down a block device.
1277
1278   First, if the device is assembled (Attach() is successfull), then
1279   the device is shutdown. Then the children of the device are
1280   shutdown.
1281
1282   This function is called recursively. Note that we don't cache the
1283   children or such, as oppossed to assemble, shutdown of different
1284   devices doesn't require that the upper device was active.
1285
1286   @type disk: L{objects.Disk}
1287   @param disk: the description of the disk we should
1288       shutdown
1289   @rtype: boolean
1290   @return: the success of the operation
1291
1292   """
1293   msgs = []
1294   result = True
1295   r_dev = _RecursiveFindBD(disk)
1296   if r_dev is not None:
1297     r_path = r_dev.dev_path
1298     try:
1299       r_dev.Shutdown()
1300       DevCacheManager.RemoveCache(r_path)
1301     except errors.BlockDeviceError, err:
1302       msgs.append(str(err))
1303       result = False
1304
1305   if disk.children:
1306     for child in disk.children:
1307       c_status, c_msg = BlockdevShutdown(child)
1308       result = result and c_status
1309       if c_msg: # not an empty message
1310         msgs.append(c_msg)
1311
1312   return (result, "; ".join(msgs))
1313
1314
1315 def BlockdevAddchildren(parent_cdev, new_cdevs):
1316   """Extend a mirrored block device.
1317
1318   @type parent_cdev: L{objects.Disk}
1319   @param parent_cdev: the disk to which we should add children
1320   @type new_cdevs: list of L{objects.Disk}
1321   @param new_cdevs: the list of children which we should add
1322   @rtype: boolean
1323   @return: the success of the operation
1324
1325   """
1326   parent_bdev = _RecursiveFindBD(parent_cdev)
1327   if parent_bdev is None:
1328     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1329   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1330   if new_bdevs.count(None) > 0:
1331     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1332   parent_bdev.AddChildren(new_bdevs)
1333   return (True, None)
1334
1335
1336 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1337   """Shrink a mirrored block device.
1338
1339   @type parent_cdev: L{objects.Disk}
1340   @param parent_cdev: the disk from which we should remove children
1341   @type new_cdevs: list of L{objects.Disk}
1342   @param new_cdevs: the list of children which we should remove
1343   @rtype: boolean
1344   @return: the success of the operation
1345
1346   """
1347   parent_bdev = _RecursiveFindBD(parent_cdev)
1348   if parent_bdev is None:
1349     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1350   devs = []
1351   for disk in new_cdevs:
1352     rpath = disk.StaticDevPath()
1353     if rpath is None:
1354       bd = _RecursiveFindBD(disk)
1355       if bd is None:
1356         _Fail("Can't find device %s while removing children", disk)
1357       else:
1358         devs.append(bd.dev_path)
1359     else:
1360       devs.append(rpath)
1361   parent_bdev.RemoveChildren(devs)
1362   return (True, None)
1363
1364
1365 def BlockdevGetmirrorstatus(disks):
1366   """Get the mirroring status of a list of devices.
1367
1368   @type disks: list of L{objects.Disk}
1369   @param disks: the list of disks which we should query
1370   @rtype: disk
1371   @return:
1372       a list of (mirror_done, estimated_time) tuples, which
1373       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1374   @raise errors.BlockDeviceError: if any of the disks cannot be
1375       found
1376
1377   """
1378   stats = []
1379   for dsk in disks:
1380     rbd = _RecursiveFindBD(dsk)
1381     if rbd is None:
1382       _Fail("Can't find device %s", dsk)
1383     stats.append(rbd.CombinedSyncStatus())
1384   return True, stats
1385
1386
1387 def _RecursiveFindBD(disk):
1388   """Check if a device is activated.
1389
1390   If so, return informations about the real device.
1391
1392   @type disk: L{objects.Disk}
1393   @param disk: the disk object we need to find
1394
1395   @return: None if the device can't be found,
1396       otherwise the device instance
1397
1398   """
1399   children = []
1400   if disk.children:
1401     for chdisk in disk.children:
1402       children.append(_RecursiveFindBD(chdisk))
1403
1404   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1405
1406
1407 def BlockdevFind(disk):
1408   """Check if a device is activated.
1409
1410   If it is, return informations about the real device.
1411
1412   @type disk: L{objects.Disk}
1413   @param disk: the disk to find
1414   @rtype: None or tuple
1415   @return: None if the disk cannot be found, otherwise a
1416       tuple (device_path, major, minor, sync_percent,
1417       estimated_time, is_degraded)
1418
1419   """
1420   try:
1421     rbd = _RecursiveFindBD(disk)
1422   except errors.BlockDeviceError, err:
1423     _Fail("Failed to find device: %s", err, exc=True)
1424   if rbd is None:
1425     return (True, None)
1426   return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1427
1428
1429 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1430   """Write a file to the filesystem.
1431
1432   This allows the master to overwrite(!) a file. It will only perform
1433   the operation if the file belongs to a list of configuration files.
1434
1435   @type file_name: str
1436   @param file_name: the target file name
1437   @type data: str
1438   @param data: the new contents of the file
1439   @type mode: int
1440   @param mode: the mode to give the file (can be None)
1441   @type uid: int
1442   @param uid: the owner of the file (can be -1 for default)
1443   @type gid: int
1444   @param gid: the group of the file (can be -1 for default)
1445   @type atime: float
1446   @param atime: the atime to set on the file (can be None)
1447   @type mtime: float
1448   @param mtime: the mtime to set on the file (can be None)
1449   @rtype: boolean
1450   @return: the success of the operation; errors are logged
1451       in the node daemon log
1452
1453   """
1454   if not os.path.isabs(file_name):
1455     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1456
1457   allowed_files = set([
1458     constants.CLUSTER_CONF_FILE,
1459     constants.ETC_HOSTS,
1460     constants.SSH_KNOWN_HOSTS_FILE,
1461     constants.VNC_PASSWORD_FILE,
1462     constants.RAPI_CERT_FILE,
1463     constants.RAPI_USERS_FILE,
1464     ])
1465
1466   for hv_name in constants.HYPER_TYPES:
1467     hv_class = hypervisor.GetHypervisor(hv_name)
1468     allowed_files.update(hv_class.GetAncillaryFiles())
1469
1470   if file_name not in allowed_files:
1471     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1472           file_name)
1473
1474   raw_data = _Decompress(data)
1475
1476   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1477                   atime=atime, mtime=mtime)
1478   return (True, "success")
1479
1480
1481 def WriteSsconfFiles(values):
1482   """Update all ssconf files.
1483
1484   Wrapper around the SimpleStore.WriteFiles.
1485
1486   """
1487   ssconf.SimpleStore().WriteFiles(values)
1488
1489
1490 def _ErrnoOrStr(err):
1491   """Format an EnvironmentError exception.
1492
1493   If the L{err} argument has an errno attribute, it will be looked up
1494   and converted into a textual C{E...} description. Otherwise the
1495   string representation of the error will be returned.
1496
1497   @type err: L{EnvironmentError}
1498   @param err: the exception to format
1499
1500   """
1501   if hasattr(err, 'errno'):
1502     detail = errno.errorcode[err.errno]
1503   else:
1504     detail = str(err)
1505   return detail
1506
1507
1508 def _OSOndiskVersion(name, os_dir):
1509   """Compute and return the API version of a given OS.
1510
1511   This function will try to read the API version of the OS given by
1512   the 'name' parameter and residing in the 'os_dir' directory.
1513
1514   @type name: str
1515   @param name: the OS name we should look for
1516   @type os_dir: str
1517   @param os_dir: the directory inwhich we should look for the OS
1518   @rtype: int or None
1519   @return:
1520       Either an integer denoting the version or None in the
1521       case when this is not a valid OS name.
1522   @raise errors.InvalidOS: if the OS cannot be found
1523
1524   """
1525   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1526
1527   try:
1528     st = os.stat(api_file)
1529   except EnvironmentError, err:
1530     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1531                            " found (%s)" % _ErrnoOrStr(err))
1532
1533   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1534     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1535                            " a regular file")
1536
1537   try:
1538     f = open(api_file)
1539     try:
1540       api_versions = f.readlines()
1541     finally:
1542       f.close()
1543   except EnvironmentError, err:
1544     raise errors.InvalidOS(name, os_dir, "error while reading the"
1545                            " API version (%s)" % _ErrnoOrStr(err))
1546
1547   api_versions = [version.strip() for version in api_versions]
1548   try:
1549     api_versions = [int(version) for version in api_versions]
1550   except (TypeError, ValueError), err:
1551     raise errors.InvalidOS(name, os_dir,
1552                            "API version is not integer (%s)" % str(err))
1553
1554   return api_versions
1555
1556
1557 def DiagnoseOS(top_dirs=None):
1558   """Compute the validity for all OSes.
1559
1560   @type top_dirs: list
1561   @param top_dirs: the list of directories in which to
1562       search (if not given defaults to
1563       L{constants.OS_SEARCH_PATH})
1564   @rtype: list of L{objects.OS}
1565   @return: an OS object for each name in all the given
1566       directories
1567
1568   """
1569   if top_dirs is None:
1570     top_dirs = constants.OS_SEARCH_PATH
1571
1572   result = []
1573   for dir_name in top_dirs:
1574     if os.path.isdir(dir_name):
1575       try:
1576         f_names = utils.ListVisibleFiles(dir_name)
1577       except EnvironmentError, err:
1578         logging.exception("Can't list the OS directory %s", dir_name)
1579         break
1580       for name in f_names:
1581         try:
1582           os_inst = OSFromDisk(name, base_dir=dir_name)
1583           result.append(os_inst)
1584         except errors.InvalidOS, err:
1585           result.append(objects.OS.FromInvalidOS(err))
1586
1587   return result
1588
1589
1590 def OSFromDisk(name, base_dir=None):
1591   """Create an OS instance from disk.
1592
1593   This function will return an OS instance if the given name is a
1594   valid OS name. Otherwise, it will raise an appropriate
1595   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1596
1597   @type base_dir: string
1598   @keyword base_dir: Base directory containing OS installations.
1599                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1600   @rtype: L{objects.OS}
1601   @return: the OS instance if we find a valid one
1602   @raise errors.InvalidOS: if we don't find a valid OS
1603
1604   """
1605   if base_dir is None:
1606     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1607     if os_dir is None:
1608       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1609   else:
1610     os_dir = os.path.sep.join([base_dir, name])
1611
1612   api_versions = _OSOndiskVersion(name, os_dir)
1613
1614   if constants.OS_API_VERSION not in api_versions:
1615     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1616                            " (found %s want %s)"
1617                            % (api_versions, constants.OS_API_VERSION))
1618
1619   # OS Scripts dictionary, we will populate it with the actual script names
1620   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1621
1622   for script in os_scripts:
1623     os_scripts[script] = os.path.sep.join([os_dir, script])
1624
1625     try:
1626       st = os.stat(os_scripts[script])
1627     except EnvironmentError, err:
1628       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1629                              (script, _ErrnoOrStr(err)))
1630
1631     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1632       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1633                              script)
1634
1635     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1636       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1637                              script)
1638
1639
1640   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1641                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1642                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1643                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1644                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1645                     api_versions=api_versions)
1646
1647 def OSEnvironment(instance, debug=0):
1648   """Calculate the environment for an os script.
1649
1650   @type instance: L{objects.Instance}
1651   @param instance: target instance for the os script run
1652   @type debug: integer
1653   @param debug: debug level (0 or 1, for OS Api 10)
1654   @rtype: dict
1655   @return: dict of environment variables
1656   @raise errors.BlockDeviceError: if the block device
1657       cannot be found
1658
1659   """
1660   result = {}
1661   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1662   result['INSTANCE_NAME'] = instance.name
1663   result['INSTANCE_OS'] = instance.os
1664   result['HYPERVISOR'] = instance.hypervisor
1665   result['DISK_COUNT'] = '%d' % len(instance.disks)
1666   result['NIC_COUNT'] = '%d' % len(instance.nics)
1667   result['DEBUG_LEVEL'] = '%d' % debug
1668   for idx, disk in enumerate(instance.disks):
1669     real_disk = _RecursiveFindBD(disk)
1670     if real_disk is None:
1671       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1672                                     str(disk))
1673     real_disk.Open()
1674     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1675     result['DISK_%d_ACCESS' % idx] = disk.mode
1676     if constants.HV_DISK_TYPE in instance.hvparams:
1677       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1678         instance.hvparams[constants.HV_DISK_TYPE]
1679     if disk.dev_type in constants.LDS_BLOCK:
1680       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1681     elif disk.dev_type == constants.LD_FILE:
1682       result['DISK_%d_BACKEND_TYPE' % idx] = \
1683         'file:%s' % disk.physical_id[0]
1684   for idx, nic in enumerate(instance.nics):
1685     result['NIC_%d_MAC' % idx] = nic.mac
1686     if nic.ip:
1687       result['NIC_%d_IP' % idx] = nic.ip
1688     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1689     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1690       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1691     if nic.nicparams[constants.NIC_LINK]:
1692       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1693     if constants.HV_NIC_TYPE in instance.hvparams:
1694       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1695         instance.hvparams[constants.HV_NIC_TYPE]
1696
1697   return result
1698
1699 def BlockdevGrow(disk, amount):
1700   """Grow a stack of block devices.
1701
1702   This function is called recursively, with the childrens being the
1703   first ones to resize.
1704
1705   @type disk: L{objects.Disk}
1706   @param disk: the disk to be grown
1707   @rtype: (status, result)
1708   @return: a tuple with the status of the operation
1709       (True/False), and the errors message if status
1710       is False
1711
1712   """
1713   r_dev = _RecursiveFindBD(disk)
1714   if r_dev is None:
1715     return False, "Cannot find block device %s" % (disk,)
1716
1717   try:
1718     r_dev.Grow(amount)
1719   except errors.BlockDeviceError, err:
1720     _Fail("Failed to grow block device: %s", err, exc=True)
1721
1722   return True, None
1723
1724
1725 def BlockdevSnapshot(disk):
1726   """Create a snapshot copy of a block device.
1727
1728   This function is called recursively, and the snapshot is actually created
1729   just for the leaf lvm backend device.
1730
1731   @type disk: L{objects.Disk}
1732   @param disk: the disk to be snapshotted
1733   @rtype: string
1734   @return: snapshot disk path
1735
1736   """
1737   if disk.children:
1738     if len(disk.children) == 1:
1739       # only one child, let's recurse on it
1740       return BlockdevSnapshot(disk.children[0])
1741     else:
1742       # more than one child, choose one that matches
1743       for child in disk.children:
1744         if child.size == disk.size:
1745           # return implies breaking the loop
1746           return BlockdevSnapshot(child)
1747   elif disk.dev_type == constants.LD_LV:
1748     r_dev = _RecursiveFindBD(disk)
1749     if r_dev is not None:
1750       # let's stay on the safe side and ask for the full size, for now
1751       return True, r_dev.Snapshot(disk.size)
1752     else:
1753       _Fail("Cannot find block device %s", disk)
1754   else:
1755     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1756           disk.unique_id, disk.dev_type)
1757
1758
1759 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1760   """Export a block device snapshot to a remote node.
1761
1762   @type disk: L{objects.Disk}
1763   @param disk: the description of the disk to export
1764   @type dest_node: str
1765   @param dest_node: the destination node to export to
1766   @type instance: L{objects.Instance}
1767   @param instance: the instance object to whom the disk belongs
1768   @type cluster_name: str
1769   @param cluster_name: the cluster name, needed for SSH hostalias
1770   @type idx: int
1771   @param idx: the index of the disk in the instance's disk list,
1772       used to export to the OS scripts environment
1773   @rtype: boolean
1774   @return: the success of the operation
1775
1776   """
1777   export_env = OSEnvironment(instance)
1778
1779   inst_os = OSFromDisk(instance.os)
1780   export_script = inst_os.export_script
1781
1782   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1783                                      instance.name, int(time.time()))
1784   if not os.path.exists(constants.LOG_OS_DIR):
1785     os.mkdir(constants.LOG_OS_DIR, 0750)
1786   real_disk = _RecursiveFindBD(disk)
1787   if real_disk is None:
1788     _Fail("Block device '%s' is not set up", disk)
1789
1790   real_disk.Open()
1791
1792   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1793   export_env['EXPORT_INDEX'] = str(idx)
1794
1795   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1796   destfile = disk.physical_id[1]
1797
1798   # the target command is built out of three individual commands,
1799   # which are joined by pipes; we check each individual command for
1800   # valid parameters
1801   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1802                                export_script, logfile)
1803
1804   comprcmd = "gzip"
1805
1806   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1807                                 destdir, destdir, destfile)
1808   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1809                                                    constants.GANETI_RUNAS,
1810                                                    destcmd)
1811
1812   # all commands have been checked, so we're safe to combine them
1813   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1814
1815   result = utils.RunCmd(command, env=export_env)
1816
1817   if result.failed:
1818     _Fail("OS snapshot export command '%s' returned error: %s"
1819           " output: %s", command, result.fail_reason, result.output)
1820
1821   return (True, None)
1822
1823
1824 def FinalizeExport(instance, snap_disks):
1825   """Write out the export configuration information.
1826
1827   @type instance: L{objects.Instance}
1828   @param instance: the instance which we export, used for
1829       saving configuration
1830   @type snap_disks: list of L{objects.Disk}
1831   @param snap_disks: list of snapshot block devices, which
1832       will be used to get the actual name of the dump file
1833
1834   @rtype: boolean
1835   @return: the success of the operation
1836
1837   """
1838   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1839   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1840
1841   config = objects.SerializableConfigParser()
1842
1843   config.add_section(constants.INISECT_EXP)
1844   config.set(constants.INISECT_EXP, 'version', '0')
1845   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1846   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1847   config.set(constants.INISECT_EXP, 'os', instance.os)
1848   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1849
1850   config.add_section(constants.INISECT_INS)
1851   config.set(constants.INISECT_INS, 'name', instance.name)
1852   config.set(constants.INISECT_INS, 'memory', '%d' %
1853              instance.beparams[constants.BE_MEMORY])
1854   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1855              instance.beparams[constants.BE_VCPUS])
1856   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1857
1858   nic_total = 0
1859   for nic_count, nic in enumerate(instance.nics):
1860     nic_total += 1
1861     config.set(constants.INISECT_INS, 'nic%d_mac' %
1862                nic_count, '%s' % nic.mac)
1863     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1864     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1865                '%s' % nic.bridge)
1866   # TODO: redundant: on load can read nics until it doesn't exist
1867   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1868
1869   disk_total = 0
1870   for disk_count, disk in enumerate(snap_disks):
1871     if disk:
1872       disk_total += 1
1873       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1874                  ('%s' % disk.iv_name))
1875       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1876                  ('%s' % disk.physical_id[1]))
1877       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1878                  ('%d' % disk.size))
1879
1880   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1881
1882   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1883                   data=config.Dumps())
1884   shutil.rmtree(finaldestdir, True)
1885   shutil.move(destdir, finaldestdir)
1886
1887   return True, None
1888
1889
1890 def ExportInfo(dest):
1891   """Get export configuration information.
1892
1893   @type dest: str
1894   @param dest: directory containing the export
1895
1896   @rtype: L{objects.SerializableConfigParser}
1897   @return: a serializable config file containing the
1898       export info
1899
1900   """
1901   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1902
1903   config = objects.SerializableConfigParser()
1904   config.read(cff)
1905
1906   if (not config.has_section(constants.INISECT_EXP) or
1907       not config.has_section(constants.INISECT_INS)):
1908     _Fail("Export info file doesn't have the required fields")
1909
1910   return True, config.Dumps()
1911
1912
1913 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1914   """Import an os image into an instance.
1915
1916   @type instance: L{objects.Instance}
1917   @param instance: instance to import the disks into
1918   @type src_node: string
1919   @param src_node: source node for the disk images
1920   @type src_images: list of string
1921   @param src_images: absolute paths of the disk images
1922   @rtype: list of boolean
1923   @return: each boolean represent the success of importing the n-th disk
1924
1925   """
1926   import_env = OSEnvironment(instance)
1927   inst_os = OSFromDisk(instance.os)
1928   import_script = inst_os.import_script
1929
1930   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1931                                         instance.name, int(time.time()))
1932   if not os.path.exists(constants.LOG_OS_DIR):
1933     os.mkdir(constants.LOG_OS_DIR, 0750)
1934
1935   comprcmd = "gunzip"
1936   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1937                                import_script, logfile)
1938
1939   final_result = []
1940   for idx, image in enumerate(src_images):
1941     if image:
1942       destcmd = utils.BuildShellCmd('cat %s', image)
1943       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1944                                                        constants.GANETI_RUNAS,
1945                                                        destcmd)
1946       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1947       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1948       import_env['IMPORT_INDEX'] = str(idx)
1949       result = utils.RunCmd(command, env=import_env)
1950       if result.failed:
1951         logging.error("Disk import command '%s' returned error: %s"
1952                       " output: %s", command, result.fail_reason,
1953                       result.output)
1954         final_result.append("error importing disk %d: %s, %s" %
1955                             (idx, result.fail_reason, result.output[-100]))
1956
1957   if final_result:
1958     return False, "; ".join(final_result)
1959   return True, None
1960
1961
1962 def ListExports():
1963   """Return a list of exports currently available on this machine.
1964
1965   @rtype: list
1966   @return: list of the exports
1967
1968   """
1969   if os.path.isdir(constants.EXPORT_DIR):
1970     return True, utils.ListVisibleFiles(constants.EXPORT_DIR)
1971   else:
1972     return False, "No exports directory"
1973
1974
1975 def RemoveExport(export):
1976   """Remove an existing export from the node.
1977
1978   @type export: str
1979   @param export: the name of the export to remove
1980   @rtype: boolean
1981   @return: the success of the operation
1982
1983   """
1984   target = os.path.join(constants.EXPORT_DIR, export)
1985
1986   try:
1987     shutil.rmtree(target)
1988   except EnvironmentError, err:
1989     _Fail("Error while removing the export: %s", err, exc=True)
1990
1991   return True, None
1992
1993
1994 def BlockdevRename(devlist):
1995   """Rename a list of block devices.
1996
1997   @type devlist: list of tuples
1998   @param devlist: list of tuples of the form  (disk,
1999       new_logical_id, new_physical_id); disk is an
2000       L{objects.Disk} object describing the current disk,
2001       and new logical_id/physical_id is the name we
2002       rename it to
2003   @rtype: boolean
2004   @return: True if all renames succeeded, False otherwise
2005
2006   """
2007   msgs = []
2008   result = True
2009   for disk, unique_id in devlist:
2010     dev = _RecursiveFindBD(disk)
2011     if dev is None:
2012       msgs.append("Can't find device %s in rename" % str(disk))
2013       result = False
2014       continue
2015     try:
2016       old_rpath = dev.dev_path
2017       dev.Rename(unique_id)
2018       new_rpath = dev.dev_path
2019       if old_rpath != new_rpath:
2020         DevCacheManager.RemoveCache(old_rpath)
2021         # FIXME: we should add the new cache information here, like:
2022         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2023         # but we don't have the owner here - maybe parse from existing
2024         # cache? for now, we only lose lvm data when we rename, which
2025         # is less critical than DRBD or MD
2026     except errors.BlockDeviceError, err:
2027       msgs.append("Can't rename device '%s' to '%s': %s" %
2028                   (dev, unique_id, err))
2029       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2030       result = False
2031   return (result, "; ".join(msgs))
2032
2033
2034 def _TransformFileStorageDir(file_storage_dir):
2035   """Checks whether given file_storage_dir is valid.
2036
2037   Checks wheter the given file_storage_dir is within the cluster-wide
2038   default file_storage_dir stored in SimpleStore. Only paths under that
2039   directory are allowed.
2040
2041   @type file_storage_dir: str
2042   @param file_storage_dir: the path to check
2043
2044   @return: the normalized path if valid, None otherwise
2045
2046   """
2047   cfg = _GetConfig()
2048   file_storage_dir = os.path.normpath(file_storage_dir)
2049   base_file_storage_dir = cfg.GetFileStorageDir()
2050   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2051       base_file_storage_dir):
2052     logging.error("file storage directory '%s' is not under base file"
2053                   " storage directory '%s'",
2054                   file_storage_dir, base_file_storage_dir)
2055     return None
2056   return file_storage_dir
2057
2058
2059 def CreateFileStorageDir(file_storage_dir):
2060   """Create file storage directory.
2061
2062   @type file_storage_dir: str
2063   @param file_storage_dir: directory to create
2064
2065   @rtype: tuple
2066   @return: tuple with first element a boolean indicating wheter dir
2067       creation was successful or not
2068
2069   """
2070   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2071   result = True,
2072   if not file_storage_dir:
2073     result = False,
2074   else:
2075     if os.path.exists(file_storage_dir):
2076       if not os.path.isdir(file_storage_dir):
2077         logging.error("'%s' is not a directory", file_storage_dir)
2078         result = False,
2079     else:
2080       try:
2081         os.makedirs(file_storage_dir, 0750)
2082       except OSError, err:
2083         logging.error("Cannot create file storage directory '%s': %s",
2084                       file_storage_dir, err)
2085         result = False,
2086   return result
2087
2088
2089 def RemoveFileStorageDir(file_storage_dir):
2090   """Remove file storage directory.
2091
2092   Remove it only if it's empty. If not log an error and return.
2093
2094   @type file_storage_dir: str
2095   @param file_storage_dir: the directory we should cleanup
2096   @rtype: tuple (success,)
2097   @return: tuple of one element, C{success}, denoting
2098       whether the operation was successfull
2099
2100   """
2101   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2102   result = True,
2103   if not file_storage_dir:
2104     result = False,
2105   else:
2106     if os.path.exists(file_storage_dir):
2107       if not os.path.isdir(file_storage_dir):
2108         logging.error("'%s' is not a directory", file_storage_dir)
2109         result = False,
2110       # deletes dir only if empty, otherwise we want to return False
2111       try:
2112         os.rmdir(file_storage_dir)
2113       except OSError, err:
2114         logging.exception("Cannot remove file storage directory '%s'",
2115                           file_storage_dir)
2116         result = False,
2117   return result
2118
2119
2120 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2121   """Rename the file storage directory.
2122
2123   @type old_file_storage_dir: str
2124   @param old_file_storage_dir: the current path
2125   @type new_file_storage_dir: str
2126   @param new_file_storage_dir: the name we should rename to
2127   @rtype: tuple (success,)
2128   @return: tuple of one element, C{success}, denoting
2129       whether the operation was successful
2130
2131   """
2132   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2133   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2134   result = True,
2135   if not old_file_storage_dir or not new_file_storage_dir:
2136     result = False,
2137   else:
2138     if not os.path.exists(new_file_storage_dir):
2139       if os.path.isdir(old_file_storage_dir):
2140         try:
2141           os.rename(old_file_storage_dir, new_file_storage_dir)
2142         except OSError, err:
2143           logging.exception("Cannot rename '%s' to '%s'",
2144                             old_file_storage_dir, new_file_storage_dir)
2145           result =  False,
2146       else:
2147         logging.error("'%s' is not a directory", old_file_storage_dir)
2148         result = False,
2149     else:
2150       if os.path.exists(old_file_storage_dir):
2151         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2152                       old_file_storage_dir, new_file_storage_dir)
2153         result = False,
2154   return result
2155
2156
2157 def _IsJobQueueFile(file_name):
2158   """Checks whether the given filename is in the queue directory.
2159
2160   @type file_name: str
2161   @param file_name: the file name we should check
2162   @rtype: boolean
2163   @return: whether the file is under the queue directory
2164
2165   """
2166   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2167   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2168
2169   if not result:
2170     logging.error("'%s' is not a file in the queue directory",
2171                   file_name)
2172
2173   return result
2174
2175
2176 def JobQueueUpdate(file_name, content):
2177   """Updates a file in the queue directory.
2178
2179   This is just a wrapper over L{utils.WriteFile}, with proper
2180   checking.
2181
2182   @type file_name: str
2183   @param file_name: the job file name
2184   @type content: str
2185   @param content: the new job contents
2186   @rtype: boolean
2187   @return: the success of the operation
2188
2189   """
2190   if not _IsJobQueueFile(file_name):
2191     return False
2192
2193   # Write and replace the file atomically
2194   utils.WriteFile(file_name, data=_Decompress(content))
2195
2196   return True
2197
2198
2199 def JobQueueRename(old, new):
2200   """Renames a job queue file.
2201
2202   This is just a wrapper over os.rename with proper checking.
2203
2204   @type old: str
2205   @param old: the old (actual) file name
2206   @type new: str
2207   @param new: the desired file name
2208   @rtype: boolean
2209   @return: the success of the operation
2210
2211   """
2212   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2213     return False
2214
2215   utils.RenameFile(old, new, mkdir=True)
2216
2217   return True
2218
2219
2220 def JobQueueSetDrainFlag(drain_flag):
2221   """Set the drain flag for the queue.
2222
2223   This will set or unset the queue drain flag.
2224
2225   @type drain_flag: boolean
2226   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2227   @rtype: boolean
2228   @return: always True
2229   @warning: the function always returns True
2230
2231   """
2232   if drain_flag:
2233     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2234   else:
2235     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2236
2237   return True
2238
2239
2240 def BlockdevClose(instance_name, disks):
2241   """Closes the given block devices.
2242
2243   This means they will be switched to secondary mode (in case of
2244   DRBD).
2245
2246   @param instance_name: if the argument is not empty, the symlinks
2247       of this instance will be removed
2248   @type disks: list of L{objects.Disk}
2249   @param disks: the list of disks to be closed
2250   @rtype: tuple (success, message)
2251   @return: a tuple of success and message, where success
2252       indicates the succes of the operation, and message
2253       which will contain the error details in case we
2254       failed
2255
2256   """
2257   bdevs = []
2258   for cf in disks:
2259     rd = _RecursiveFindBD(cf)
2260     if rd is None:
2261       _Fail("Can't find device %s", cf)
2262     bdevs.append(rd)
2263
2264   msg = []
2265   for rd in bdevs:
2266     try:
2267       rd.Close()
2268     except errors.BlockDeviceError, err:
2269       msg.append(str(err))
2270   if msg:
2271     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2272   else:
2273     if instance_name:
2274       _RemoveBlockDevLinks(instance_name, disks)
2275     return (True, "All devices secondary")
2276
2277
2278 def ValidateHVParams(hvname, hvparams):
2279   """Validates the given hypervisor parameters.
2280
2281   @type hvname: string
2282   @param hvname: the hypervisor name
2283   @type hvparams: dict
2284   @param hvparams: the hypervisor parameters to be validated
2285   @rtype: tuple (success, message)
2286   @return: a tuple of success and message, where success
2287       indicates the succes of the operation, and message
2288       which will contain the error details in case we
2289       failed
2290
2291   """
2292   try:
2293     hv_type = hypervisor.GetHypervisor(hvname)
2294     hv_type.ValidateParameters(hvparams)
2295     return (True, "Validation passed")
2296   except errors.HypervisorError, err:
2297     return (False, str(err))
2298
2299
2300 def DemoteFromMC():
2301   """Demotes the current node from master candidate role.
2302
2303   """
2304   # try to ensure we're not the master by mistake
2305   master, myself = ssconf.GetMasterAndMyself()
2306   if master == myself:
2307     return (False, "ssconf status shows I'm the master node, will not demote")
2308   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2309   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2310     return (False, "The master daemon is running, will not demote")
2311   try:
2312     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2313   except EnvironmentError, err:
2314     if err.errno != errno.ENOENT:
2315       return (False, "Error while backing up cluster file: %s" % str(err))
2316   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2317   return (True, "Done")
2318
2319
2320 def _FindDisks(nodes_ip, disks):
2321   """Sets the physical ID on disks and returns the block devices.
2322
2323   """
2324   # set the correct physical ID
2325   my_name = utils.HostInfo().name
2326   for cf in disks:
2327     cf.SetPhysicalID(my_name, nodes_ip)
2328
2329   bdevs = []
2330
2331   for cf in disks:
2332     rd = _RecursiveFindBD(cf)
2333     if rd is None:
2334       return (False, "Can't find device %s" % cf)
2335     bdevs.append(rd)
2336   return (True, bdevs)
2337
2338
2339 def DrbdDisconnectNet(nodes_ip, disks):
2340   """Disconnects the network on a list of drbd devices.
2341
2342   """
2343   status, bdevs = _FindDisks(nodes_ip, disks)
2344   if not status:
2345     return status, bdevs
2346
2347   # disconnect disks
2348   for rd in bdevs:
2349     try:
2350       rd.DisconnectNet()
2351     except errors.BlockDeviceError, err:
2352       _Fail("Can't change network configuration to standalone mode: %s",
2353             err, exc=True)
2354   return (True, "All disks are now disconnected")
2355
2356
2357 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2358   """Attaches the network on a list of drbd devices.
2359
2360   """
2361   status, bdevs = _FindDisks(nodes_ip, disks)
2362   if not status:
2363     return status, bdevs
2364
2365   if multimaster:
2366     for idx, rd in enumerate(bdevs):
2367       try:
2368         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2369       except EnvironmentError, err:
2370         _Fail("Can't create symlink: %s", err)
2371   # reconnect disks, switch to new master configuration and if
2372   # needed primary mode
2373   for rd in bdevs:
2374     try:
2375       rd.AttachNet(multimaster)
2376     except errors.BlockDeviceError, err:
2377       _Fail("Can't change network configuration: %s", err)
2378   # wait until the disks are connected; we need to retry the re-attach
2379   # if the device becomes standalone, as this might happen if the one
2380   # node disconnects and reconnects in a different mode before the
2381   # other node reconnects; in this case, one or both of the nodes will
2382   # decide it has wrong configuration and switch to standalone
2383   RECONNECT_TIMEOUT = 2 * 60
2384   sleep_time = 0.100 # start with 100 miliseconds
2385   timeout_limit = time.time() + RECONNECT_TIMEOUT
2386   while time.time() < timeout_limit:
2387     all_connected = True
2388     for rd in bdevs:
2389       stats = rd.GetProcStatus()
2390       if not (stats.is_connected or stats.is_in_resync):
2391         all_connected = False
2392       if stats.is_standalone:
2393         # peer had different config info and this node became
2394         # standalone, even though this should not happen with the
2395         # new staged way of changing disk configs
2396         try:
2397           rd.ReAttachNet(multimaster)
2398         except errors.BlockDeviceError, err:
2399           _Fail("Can't change network configuration: %s", err)
2400     if all_connected:
2401       break
2402     time.sleep(sleep_time)
2403     sleep_time = min(5, sleep_time * 1.5)
2404   if not all_connected:
2405     return (False, "Timeout in disk reconnecting")
2406   if multimaster:
2407     # change to primary mode
2408     for rd in bdevs:
2409       try:
2410         rd.Open()
2411       except errors.BlockDeviceError, err:
2412         _Fail("Can't change to primary mode: %s", err)
2413   if multimaster:
2414     msg = "multi-master and primary"
2415   else:
2416     msg = "single-master"
2417   return (True, "Disks are now configured as %s" % msg)
2418
2419
2420 def DrbdWaitSync(nodes_ip, disks):
2421   """Wait until DRBDs have synchronized.
2422
2423   """
2424   status, bdevs = _FindDisks(nodes_ip, disks)
2425   if not status:
2426     return status, bdevs
2427
2428   min_resync = 100
2429   alldone = True
2430   failure = False
2431   for rd in bdevs:
2432     stats = rd.GetProcStatus()
2433     if not (stats.is_connected or stats.is_in_resync):
2434       failure = True
2435       break
2436     alldone = alldone and (not stats.is_in_resync)
2437     if stats.sync_percent is not None:
2438       min_resync = min(min_resync, stats.sync_percent)
2439   return (not failure, (alldone, min_resync))
2440
2441
2442 def PowercycleNode(hypervisor_type):
2443   """Hard-powercycle the node.
2444
2445   Because we need to return first, and schedule the powercycle in the
2446   background, we won't be able to report failures nicely.
2447
2448   """
2449   hyper = hypervisor.GetHypervisor(hypervisor_type)
2450   try:
2451     pid = os.fork()
2452   except OSError, err:
2453     # if we can't fork, we'll pretend that we're in the child process
2454     pid = 0
2455   if pid > 0:
2456     return (True, "Reboot scheduled in 5 seconds")
2457   time.sleep(5)
2458   hyper.PowercycleNode()
2459
2460
2461 class HooksRunner(object):
2462   """Hook runner.
2463
2464   This class is instantiated on the node side (ganeti-noded) and not
2465   on the master side.
2466
2467   """
2468   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2469
2470   def __init__(self, hooks_base_dir=None):
2471     """Constructor for hooks runner.
2472
2473     @type hooks_base_dir: str or None
2474     @param hooks_base_dir: if not None, this overrides the
2475         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2476
2477     """
2478     if hooks_base_dir is None:
2479       hooks_base_dir = constants.HOOKS_BASE_DIR
2480     self._BASE_DIR = hooks_base_dir
2481
2482   @staticmethod
2483   def ExecHook(script, env):
2484     """Exec one hook script.
2485
2486     @type script: str
2487     @param script: the full path to the script
2488     @type env: dict
2489     @param env: the environment with which to exec the script
2490     @rtype: tuple (success, message)
2491     @return: a tuple of success and message, where success
2492         indicates the succes of the operation, and message
2493         which will contain the error details in case we
2494         failed
2495
2496     """
2497     # exec the process using subprocess and log the output
2498     fdstdin = None
2499     try:
2500       fdstdin = open("/dev/null", "r")
2501       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2502                                stderr=subprocess.STDOUT, close_fds=True,
2503                                shell=False, cwd="/", env=env)
2504       output = ""
2505       try:
2506         output = child.stdout.read(4096)
2507         child.stdout.close()
2508       except EnvironmentError, err:
2509         output += "Hook script error: %s" % str(err)
2510
2511       while True:
2512         try:
2513           result = child.wait()
2514           break
2515         except EnvironmentError, err:
2516           if err.errno == errno.EINTR:
2517             continue
2518           raise
2519     finally:
2520       # try not to leak fds
2521       for fd in (fdstdin, ):
2522         if fd is not None:
2523           try:
2524             fd.close()
2525           except EnvironmentError, err:
2526             # just log the error
2527             #logging.exception("Error while closing fd %s", fd)
2528             pass
2529
2530     return result == 0, utils.SafeEncode(output.strip())
2531
2532   def RunHooks(self, hpath, phase, env):
2533     """Run the scripts in the hooks directory.
2534
2535     @type hpath: str
2536     @param hpath: the path to the hooks directory which
2537         holds the scripts
2538     @type phase: str
2539     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2540         L{constants.HOOKS_PHASE_POST}
2541     @type env: dict
2542     @param env: dictionary with the environment for the hook
2543     @rtype: list
2544     @return: list of 3-element tuples:
2545       - script path
2546       - script result, either L{constants.HKR_SUCCESS} or
2547         L{constants.HKR_FAIL}
2548       - output of the script
2549
2550     @raise errors.ProgrammerError: for invalid input
2551         parameters
2552
2553     """
2554     if phase == constants.HOOKS_PHASE_PRE:
2555       suffix = "pre"
2556     elif phase == constants.HOOKS_PHASE_POST:
2557       suffix = "post"
2558     else:
2559       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2560     rr = []
2561
2562     subdir = "%s-%s.d" % (hpath, suffix)
2563     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2564     try:
2565       dir_contents = utils.ListVisibleFiles(dir_name)
2566     except OSError, err:
2567       # FIXME: must log output in case of failures
2568       return rr
2569
2570     # we use the standard python sort order,
2571     # so 00name is the recommended naming scheme
2572     dir_contents.sort()
2573     for relname in dir_contents:
2574       fname = os.path.join(dir_name, relname)
2575       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2576           self.RE_MASK.match(relname) is not None):
2577         rrval = constants.HKR_SKIP
2578         output = ""
2579       else:
2580         result, output = self.ExecHook(fname, env)
2581         if not result:
2582           rrval = constants.HKR_FAIL
2583         else:
2584           rrval = constants.HKR_SUCCESS
2585       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2586
2587     return rr
2588
2589
2590 class IAllocatorRunner(object):
2591   """IAllocator runner.
2592
2593   This class is instantiated on the node side (ganeti-noded) and not on
2594   the master side.
2595
2596   """
2597   def Run(self, name, idata):
2598     """Run an iallocator script.
2599
2600     @type name: str
2601     @param name: the iallocator script name
2602     @type idata: str
2603     @param idata: the allocator input data
2604
2605     @rtype: tuple
2606     @return: four element tuple of:
2607        - run status (one of the IARUN_ constants)
2608        - stdout
2609        - stderr
2610        - fail reason (as from L{utils.RunResult})
2611
2612     """
2613     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2614                                   os.path.isfile)
2615     if alloc_script is None:
2616       return (constants.IARUN_NOTFOUND, None, None, None)
2617
2618     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2619     try:
2620       os.write(fd, idata)
2621       os.close(fd)
2622       result = utils.RunCmd([alloc_script, fin_name])
2623       if result.failed:
2624         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2625                 result.fail_reason)
2626     finally:
2627       os.unlink(fin_name)
2628
2629     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2630
2631
2632 class DevCacheManager(object):
2633   """Simple class for managing a cache of block device information.
2634
2635   """
2636   _DEV_PREFIX = "/dev/"
2637   _ROOT_DIR = constants.BDEV_CACHE_DIR
2638
2639   @classmethod
2640   def _ConvertPath(cls, dev_path):
2641     """Converts a /dev/name path to the cache file name.
2642
2643     This replaces slashes with underscores and strips the /dev
2644     prefix. It then returns the full path to the cache file.
2645
2646     @type dev_path: str
2647     @param dev_path: the C{/dev/} path name
2648     @rtype: str
2649     @return: the converted path name
2650
2651     """
2652     if dev_path.startswith(cls._DEV_PREFIX):
2653       dev_path = dev_path[len(cls._DEV_PREFIX):]
2654     dev_path = dev_path.replace("/", "_")
2655     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2656     return fpath
2657
2658   @classmethod
2659   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2660     """Updates the cache information for a given device.
2661
2662     @type dev_path: str
2663     @param dev_path: the pathname of the device
2664     @type owner: str
2665     @param owner: the owner (instance name) of the device
2666     @type on_primary: bool
2667     @param on_primary: whether this is the primary
2668         node nor not
2669     @type iv_name: str
2670     @param iv_name: the instance-visible name of the
2671         device, as in objects.Disk.iv_name
2672
2673     @rtype: None
2674
2675     """
2676     if dev_path is None:
2677       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2678       return
2679     fpath = cls._ConvertPath(dev_path)
2680     if on_primary:
2681       state = "primary"
2682     else:
2683       state = "secondary"
2684     if iv_name is None:
2685       iv_name = "not_visible"
2686     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2687     try:
2688       utils.WriteFile(fpath, data=fdata)
2689     except EnvironmentError, err:
2690       logging.exception("Can't update bdev cache for %s", dev_path)
2691
2692   @classmethod
2693   def RemoveCache(cls, dev_path):
2694     """Remove data for a dev_path.
2695
2696     This is just a wrapper over L{utils.RemoveFile} with a converted
2697     path name and logging.
2698
2699     @type dev_path: str
2700     @param dev_path: the pathname of the device
2701
2702     @rtype: None
2703
2704     """
2705     if dev_path is None:
2706       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2707       return
2708     fpath = cls._ConvertPath(dev_path)
2709     try:
2710       utils.RemoveFile(fpath)
2711     except EnvironmentError, err:
2712       logging.exception("Can't update bdev cache for %s", dev_path)