4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
64 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
65 _ALLOWED_CLEAN_DIRS = frozenset([
67 constants.JOB_QUEUE_ARCHIVE_DIR,
69 constants.CRYPTO_KEYS_DIR,
71 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
72 _X509_KEY_FILE = "key"
73 _X509_CERT_FILE = "cert"
74 _IES_STATUS_FILE = "status"
79 class RPCFail(Exception):
80 """Class denoting RPC failure.
82 Its argument is the error message.
87 def _Fail(msg, *args, **kwargs):
88 """Log an error and the raise an RPCFail exception.
90 This exception is then handled specially in the ganeti daemon and
91 turned into a 'failed' return type. As such, this function is a
92 useful shortcut for logging the error and returning it to the master
96 @param msg: the text of the exception
102 if "log" not in kwargs or kwargs["log"]: # if we should log this error
103 if "exc" in kwargs and kwargs["exc"]:
104 logging.exception(msg)
111 """Simple wrapper to return a SimpleStore.
113 @rtype: L{ssconf.SimpleStore}
114 @return: a SimpleStore instance
117 return ssconf.SimpleStore()
120 def _GetSshRunner(cluster_name):
121 """Simple wrapper to return an SshRunner.
123 @type cluster_name: str
124 @param cluster_name: the cluster name, which is needed
125 by the SshRunner constructor
126 @rtype: L{ssh.SshRunner}
127 @return: an SshRunner instance
130 return ssh.SshRunner(cluster_name)
133 def _Decompress(data):
134 """Unpacks data compressed by the RPC client.
136 @type data: list or tuple
137 @param data: Data sent by RPC client
139 @return: Decompressed data
142 assert isinstance(data, (list, tuple))
143 assert len(data) == 2
144 (encoding, content) = data
145 if encoding == constants.RPC_ENCODING_NONE:
147 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
148 return zlib.decompress(base64.b64decode(content))
150 raise AssertionError("Unknown data encoding")
153 def _CleanDirectory(path, exclude=None):
154 """Removes all regular files in a directory.
157 @param path: the directory to clean
159 @param exclude: list of files to be excluded, defaults
163 if path not in _ALLOWED_CLEAN_DIRS:
164 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
167 if not os.path.isdir(path):
172 # Normalize excluded paths
173 exclude = [os.path.normpath(i) for i in exclude]
175 for rel_name in utils.ListVisibleFiles(path):
176 full_name = utils.PathJoin(path, rel_name)
177 if full_name in exclude:
179 if os.path.isfile(full_name) and not os.path.islink(full_name):
180 utils.RemoveFile(full_name)
183 def _BuildUploadFileList():
184 """Build the list of allowed upload files.
186 This is abstracted so that it's built only once at module import time.
189 allowed_files = set([
190 constants.CLUSTER_CONF_FILE,
192 constants.SSH_KNOWN_HOSTS_FILE,
193 constants.VNC_PASSWORD_FILE,
194 constants.RAPI_CERT_FILE,
195 constants.RAPI_USERS_FILE,
196 constants.CONFD_HMAC_KEY,
197 constants.CLUSTER_DOMAIN_SECRET_FILE,
200 for hv_name in constants.HYPER_TYPES:
201 hv_class = hypervisor.GetHypervisorClass(hv_name)
202 allowed_files.update(hv_class.GetAncillaryFiles())
204 return frozenset(allowed_files)
207 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
211 """Removes job queue files and archived jobs.
217 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
218 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
222 """Returns master information.
224 This is an utility function to compute master information, either
225 for consumption here or from the node daemon.
228 @return: master_netdev, master_ip, master_name
229 @raise RPCFail: in case of errors
234 master_netdev = cfg.GetMasterNetdev()
235 master_ip = cfg.GetMasterIP()
236 master_node = cfg.GetMasterNode()
237 except errors.ConfigurationError, err:
238 _Fail("Cluster configuration incomplete: %s", err, exc=True)
239 return (master_netdev, master_ip, master_node)
242 def StartMaster(start_daemons, no_voting):
243 """Activate local node as master node.
245 The function will either try activate the IP address of the master
246 (unless someone else has it) or also start the master daemons, based
247 on the start_daemons parameter.
249 @type start_daemons: boolean
250 @param start_daemons: whether to start the master daemons
251 (ganeti-masterd and ganeti-rapi), or (if false) activate the
253 @type no_voting: boolean
254 @param no_voting: whether to start ganeti-masterd without a node vote
255 (if start_daemons is True), but still non-interactively
259 # GetMasterInfo will raise an exception if not able to return data
260 master_netdev, master_ip, _ = GetMasterInfo()
263 # either start the master and rapi daemons
266 masterd_args = "--no-voting --yes-do-it"
271 "EXTRA_MASTERD_ARGS": masterd_args,
274 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
276 msg = "Can't start Ganeti master: %s" % result.output
281 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
282 if netutils.OwnIpAddress(master_ip):
283 # we already have the ip:
284 logging.debug("Master IP already configured, doing nothing")
286 msg = "Someone else has the master ip, not activating"
290 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
291 "dev", master_netdev, "label",
292 "%s:0" % master_netdev])
294 msg = "Can't activate master IP: %s" % result.output
298 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
299 "-s", master_ip, master_ip])
300 # we'll ignore the exit code of arping
303 _Fail("; ".join(err_msgs))
306 def StopMaster(stop_daemons):
307 """Deactivate this node as master.
309 The function will always try to deactivate the IP address of the
310 master. It will also stop the master daemons depending on the
311 stop_daemons parameter.
313 @type stop_daemons: boolean
314 @param stop_daemons: whether to also stop the master daemons
315 (ganeti-masterd and ganeti-rapi)
319 # TODO: log and report back to the caller the error failures; we
320 # need to decide in which case we fail the RPC for this
322 # GetMasterInfo will raise an exception if not able to return data
323 master_netdev, master_ip, _ = GetMasterInfo()
325 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
326 "dev", master_netdev])
328 logging.error("Can't remove the master IP, error: %s", result.output)
329 # but otherwise ignore the failure
332 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
334 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
336 result.cmd, result.exit_code, result.output)
339 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
340 """Joins this node to the cluster.
342 This does the following:
343 - updates the hostkeys of the machine (rsa and dsa)
344 - adds the ssh private key to the user
345 - adds the ssh public key to the users' authorized_keys file
348 @param dsa: the DSA private key to write
350 @param dsapub: the DSA public key to write
352 @param rsa: the RSA private key to write
354 @param rsapub: the RSA public key to write
356 @param sshkey: the SSH private key to write
358 @param sshpub: the SSH public key to write
360 @return: the success of the operation
363 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
364 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
365 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
366 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
367 for name, content, mode in sshd_keys:
368 utils.WriteFile(name, data=content, mode=mode)
371 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
373 except errors.OpExecError, err:
374 _Fail("Error while processing user ssh files: %s", err, exc=True)
376 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
377 utils.WriteFile(name, data=content, mode=0600)
379 utils.AddAuthorizedKey(auth_keys, sshpub)
381 result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
383 _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
384 result.cmd, result.exit_code, result.output)
387 def LeaveCluster(modify_ssh_setup):
388 """Cleans up and remove the current node.
390 This function cleans up and prepares the current node to be removed
393 If processing is successful, then it raises an
394 L{errors.QuitGanetiException} which is used as a special case to
395 shutdown the node daemon.
397 @param modify_ssh_setup: boolean
400 _CleanDirectory(constants.DATA_DIR)
401 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
406 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
408 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
410 utils.RemoveFile(priv_key)
411 utils.RemoveFile(pub_key)
412 except errors.OpExecError:
413 logging.exception("Error while processing ssh files")
416 utils.RemoveFile(constants.CONFD_HMAC_KEY)
417 utils.RemoveFile(constants.RAPI_CERT_FILE)
418 utils.RemoveFile(constants.NODED_CERT_FILE)
419 except: # pylint: disable-msg=W0702
420 logging.exception("Error while removing cluster secrets")
422 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
424 logging.error("Command %s failed with exitcode %s and error %s",
425 result.cmd, result.exit_code, result.output)
427 # Raise a custom exception (handled in ganeti-noded)
428 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
431 def GetNodeInfo(vgname, hypervisor_type):
432 """Gives back a hash with different information about the node.
434 @type vgname: C{string}
435 @param vgname: the name of the volume group to ask for disk space information
436 @type hypervisor_type: C{str}
437 @param hypervisor_type: the name of the hypervisor to ask for
440 @return: dictionary with the following keys:
441 - vg_size is the size of the configured volume group in MiB
442 - vg_free is the free size of the volume group in MiB
443 - memory_dom0 is the memory allocated for domain0 in MiB
444 - memory_free is the currently available (free) ram in MiB
445 - memory_total is the total number of ram in MiB
449 vginfo = _GetVGInfo(vgname)
450 outputarray['vg_size'] = vginfo['vg_size']
451 outputarray['vg_free'] = vginfo['vg_free']
453 hyper = hypervisor.GetHypervisor(hypervisor_type)
454 hyp_info = hyper.GetNodeInfo()
455 if hyp_info is not None:
456 outputarray.update(hyp_info)
458 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
463 def VerifyNode(what, cluster_name):
464 """Verify the status of the local node.
466 Based on the input L{what} parameter, various checks are done on the
469 If the I{filelist} key is present, this list of
470 files is checksummed and the file/checksum pairs are returned.
472 If the I{nodelist} key is present, we check that we have
473 connectivity via ssh with the target nodes (and check the hostname
476 If the I{node-net-test} key is present, we check that we have
477 connectivity to the given nodes via both primary IP and, if
478 applicable, secondary IPs.
481 @param what: a dictionary of things to check:
482 - filelist: list of files for which to compute checksums
483 - nodelist: list of nodes we should check ssh communication with
484 - node-net-test: list of nodes we should check node daemon port
486 - hypervisor: list with hypervisors to run the verify for
488 @return: a dictionary with the same keys as the input dict, and
489 values representing the result of the checks
493 my_name = netutils.HostInfo().name
494 port = netutils.GetDaemonPort(constants.NODED)
496 if constants.NV_HYPERVISOR in what:
497 result[constants.NV_HYPERVISOR] = tmp = {}
498 for hv_name in what[constants.NV_HYPERVISOR]:
500 val = hypervisor.GetHypervisor(hv_name).Verify()
501 except errors.HypervisorError, err:
502 val = "Error while checking hypervisor: %s" % str(err)
505 if constants.NV_FILELIST in what:
506 result[constants.NV_FILELIST] = utils.FingerprintFiles(
507 what[constants.NV_FILELIST])
509 if constants.NV_NODELIST in what:
510 result[constants.NV_NODELIST] = tmp = {}
511 random.shuffle(what[constants.NV_NODELIST])
512 for node in what[constants.NV_NODELIST]:
513 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
517 if constants.NV_NODENETTEST in what:
518 result[constants.NV_NODENETTEST] = tmp = {}
519 my_pip = my_sip = None
520 for name, pip, sip in what[constants.NV_NODENETTEST]:
526 tmp[my_name] = ("Can't find my own primary/secondary IP"
529 for name, pip, sip in what[constants.NV_NODENETTEST]:
531 if not netutils.TcpPing(pip, port, source=my_pip):
532 fail.append("primary")
534 if not netutils.TcpPing(sip, port, source=my_sip):
535 fail.append("secondary")
537 tmp[name] = ("failure using the %s interface(s)" %
540 if constants.NV_MASTERIP in what:
541 # FIXME: add checks on incoming data structures (here and in the
542 # rest of the function)
543 master_name, master_ip = what[constants.NV_MASTERIP]
544 if master_name == my_name:
545 source = constants.IP4_ADDRESS_LOCALHOST
548 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
551 if constants.NV_LVLIST in what:
553 val = GetVolumeList(what[constants.NV_LVLIST])
556 result[constants.NV_LVLIST] = val
558 if constants.NV_INSTANCELIST in what:
559 # GetInstanceList can fail
561 val = GetInstanceList(what[constants.NV_INSTANCELIST])
564 result[constants.NV_INSTANCELIST] = val
566 if constants.NV_VGLIST in what:
567 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
569 if constants.NV_PVLIST in what:
570 result[constants.NV_PVLIST] = \
571 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
572 filter_allocatable=False)
574 if constants.NV_VERSION in what:
575 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
576 constants.RELEASE_VERSION)
578 if constants.NV_HVINFO in what:
579 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
580 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
582 if constants.NV_DRBDLIST in what:
584 used_minors = bdev.DRBD8.GetUsedDevs().keys()
585 except errors.BlockDeviceError, err:
586 logging.warning("Can't get used minors list", exc_info=True)
587 used_minors = str(err)
588 result[constants.NV_DRBDLIST] = used_minors
590 if constants.NV_DRBDHELPER in what:
593 payload = bdev.BaseDRBD.GetUsermodeHelper()
594 except errors.BlockDeviceError, err:
595 logging.error("Can't get DRBD usermode helper: %s", str(err))
598 result[constants.NV_DRBDHELPER] = (status, payload)
600 if constants.NV_NODESETUP in what:
601 result[constants.NV_NODESETUP] = tmpr = []
602 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
603 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
604 " under /sys, missing required directories /sys/block"
605 " and /sys/class/net")
606 if (not os.path.isdir("/proc/sys") or
607 not os.path.isfile("/proc/sysrq-trigger")):
608 tmpr.append("The procfs filesystem doesn't seem to be mounted"
609 " under /proc, missing required directory /proc/sys and"
610 " the file /proc/sysrq-trigger")
612 if constants.NV_TIME in what:
613 result[constants.NV_TIME] = utils.SplitTime(time.time())
615 if constants.NV_OSLIST in what:
616 result[constants.NV_OSLIST] = DiagnoseOS()
621 def GetVolumeList(vg_name):
622 """Compute list of logical volumes and their size.
625 @param vg_name: the volume group whose LVs we should list
628 dictionary of all partions (key) with value being a tuple of
629 their size (in MiB), inactive and online status::
631 {'test1': ('20.06', True, True)}
633 in case of errors, a string is returned with the error
639 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
640 "--separator=%s" % sep,
641 "-olv_name,lv_size,lv_attr", vg_name])
643 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
645 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
646 for line in result.stdout.splitlines():
648 match = valid_line_re.match(line)
650 logging.error("Invalid line returned from lvs output: '%s'", line)
652 name, size, attr = match.groups()
653 inactive = attr[4] == '-'
654 online = attr[5] == 'o'
655 virtual = attr[0] == 'v'
657 # we don't want to report such volumes as existing, since they
658 # don't really hold data
660 lvs[name] = (size, inactive, online)
665 def ListVolumeGroups():
666 """List the volume groups and their size.
669 @return: dictionary with keys volume name and values the
673 return utils.ListVolumeGroups()
677 """List all volumes on this node.
681 A list of dictionaries, each having four keys:
682 - name: the logical volume name,
683 - size: the size of the logical volume
684 - dev: the physical device on which the LV lives
685 - vg: the volume group to which it belongs
687 In case of errors, we return an empty list and log the
690 Note that since a logical volume can live on multiple physical
691 volumes, the resulting list might include a logical volume
695 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
697 "--options=lv_name,lv_size,devices,vg_name"])
699 _Fail("Failed to list logical volumes, lvs output: %s",
703 return dev.split('(')[0]
706 return [parse_dev(x) for x in dev.split(",")]
709 line = [v.strip() for v in line]
710 return [{'name': line[0], 'size': line[1],
711 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
714 for line in result.stdout.splitlines():
715 if line.count('|') >= 3:
716 all_devs.extend(map_line(line.split('|')))
718 logging.warning("Strange line in the output from lvs: '%s'", line)
722 def BridgesExist(bridges_list):
723 """Check if a list of bridges exist on the current node.
726 @return: C{True} if all of them exist, C{False} otherwise
730 for bridge in bridges_list:
731 if not utils.BridgeExists(bridge):
732 missing.append(bridge)
735 _Fail("Missing bridges %s", utils.CommaJoin(missing))
738 def GetInstanceList(hypervisor_list):
739 """Provides a list of instances.
741 @type hypervisor_list: list
742 @param hypervisor_list: the list of hypervisors to query information
745 @return: a list of all running instances on the current node
746 - instance1.example.com
747 - instance2.example.com
751 for hname in hypervisor_list:
753 names = hypervisor.GetHypervisor(hname).ListInstances()
754 results.extend(names)
755 except errors.HypervisorError, err:
756 _Fail("Error enumerating instances (hypervisor %s): %s",
757 hname, err, exc=True)
762 def GetInstanceInfo(instance, hname):
763 """Gives back the information about an instance as a dictionary.
765 @type instance: string
766 @param instance: the instance name
768 @param hname: the hypervisor type of the instance
771 @return: dictionary with the following keys:
772 - memory: memory size of instance (int)
773 - state: xen state of instance (string)
774 - time: cpu time of instance (float)
779 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
780 if iinfo is not None:
781 output['memory'] = iinfo[2]
782 output['state'] = iinfo[4]
783 output['time'] = iinfo[5]
788 def GetInstanceMigratable(instance):
789 """Gives whether an instance can be migrated.
791 @type instance: L{objects.Instance}
792 @param instance: object representing the instance to be checked.
795 @return: tuple of (result, description) where:
796 - result: whether the instance can be migrated or not
797 - description: a description of the issue, if relevant
800 hyper = hypervisor.GetHypervisor(instance.hypervisor)
801 iname = instance.name
802 if iname not in hyper.ListInstances():
803 _Fail("Instance %s is not running", iname)
805 for idx in range(len(instance.disks)):
806 link_name = _GetBlockDevSymlinkPath(iname, idx)
807 if not os.path.islink(link_name):
808 _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
811 def GetAllInstancesInfo(hypervisor_list):
812 """Gather data about all instances.
814 This is the equivalent of L{GetInstanceInfo}, except that it
815 computes data for all instances at once, thus being faster if one
816 needs data about more than one instance.
818 @type hypervisor_list: list
819 @param hypervisor_list: list of hypervisors to query for instance data
822 @return: dictionary of instance: data, with data having the following keys:
823 - memory: memory size of instance (int)
824 - state: xen state of instance (string)
825 - time: cpu time of instance (float)
826 - vcpus: the number of vcpus
831 for hname in hypervisor_list:
832 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
834 for name, _, memory, vcpus, state, times in iinfo:
842 # we only check static parameters, like memory and vcpus,
843 # and not state and time which can change between the
844 # invocations of the different hypervisors
845 for key in 'memory', 'vcpus':
846 if value[key] != output[name][key]:
847 _Fail("Instance %s is running twice"
848 " with different parameters", name)
854 def _InstanceLogName(kind, os_name, instance):
855 """Compute the OS log filename for a given instance and operation.
857 The instance name and os name are passed in as strings since not all
858 operations have these as part of an instance object.
861 @param kind: the operation type (e.g. add, import, etc.)
862 @type os_name: string
863 @param os_name: the os name
864 @type instance: string
865 @param instance: the name of the instance being imported/added/etc.
868 # TODO: Use tempfile.mkstemp to create unique filename
869 base = ("%s-%s-%s-%s.log" %
870 (kind, os_name, instance, utils.TimestampForFilename()))
871 return utils.PathJoin(constants.LOG_OS_DIR, base)
874 def InstanceOsAdd(instance, reinstall, debug):
875 """Add an OS to an instance.
877 @type instance: L{objects.Instance}
878 @param instance: Instance whose OS is to be installed
879 @type reinstall: boolean
880 @param reinstall: whether this is an instance reinstall
882 @param debug: debug level, passed to the OS scripts
886 inst_os = OSFromDisk(instance.os)
888 create_env = OSEnvironment(instance, inst_os, debug)
890 create_env['INSTANCE_REINSTALL'] = "1"
892 logfile = _InstanceLogName("add", instance.os, instance.name)
894 result = utils.RunCmd([inst_os.create_script], env=create_env,
895 cwd=inst_os.path, output=logfile,)
897 logging.error("os create command '%s' returned error: %s, logfile: %s,"
898 " output: %s", result.cmd, result.fail_reason, logfile,
900 lines = [utils.SafeEncode(val)
901 for val in utils.TailFile(logfile, lines=20)]
902 _Fail("OS create script failed (%s), last lines in the"
903 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
906 def RunRenameInstance(instance, old_name, debug):
907 """Run the OS rename script for an instance.
909 @type instance: L{objects.Instance}
910 @param instance: Instance whose OS is to be installed
911 @type old_name: string
912 @param old_name: previous instance name
914 @param debug: debug level, passed to the OS scripts
916 @return: the success of the operation
919 inst_os = OSFromDisk(instance.os)
921 rename_env = OSEnvironment(instance, inst_os, debug)
922 rename_env['OLD_INSTANCE_NAME'] = old_name
924 logfile = _InstanceLogName("rename", instance.os,
925 "%s-%s" % (old_name, instance.name))
927 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928 cwd=inst_os.path, output=logfile)
931 logging.error("os create command '%s' returned error: %s output: %s",
932 result.cmd, result.fail_reason, result.output)
933 lines = [utils.SafeEncode(val)
934 for val in utils.TailFile(logfile, lines=20)]
935 _Fail("OS rename script failed (%s), last lines in the"
936 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
939 def _GetVGInfo(vg_name):
940 """Get information about the volume group.
943 @param vg_name: the volume group which we query
946 A dictionary with the following keys:
947 - C{vg_size} is the total size of the volume group in MiB
948 - C{vg_free} is the free size of the volume group in MiB
949 - C{pv_count} are the number of physical disks in that VG
951 If an error occurs during gathering of data, we return the same dict
952 with keys all set to None.
955 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
957 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
958 "--nosuffix", "--units=m", "--separator=:", vg_name])
961 logging.error("volume group %s not present", vg_name)
963 valarr = retval.stdout.strip().rstrip(':').split(':')
967 "vg_size": int(round(float(valarr[0]), 0)),
968 "vg_free": int(round(float(valarr[1]), 0)),
969 "pv_count": int(valarr[2]),
971 except (TypeError, ValueError), err:
972 logging.exception("Fail to parse vgs output: %s", err)
974 logging.error("vgs output has the wrong number of fields (expected"
975 " three): %s", str(valarr))
979 def _GetBlockDevSymlinkPath(instance_name, idx):
980 return utils.PathJoin(constants.DISK_LINKS_DIR,
981 "%s:%d" % (instance_name, idx))
984 def _SymlinkBlockDev(instance_name, device_path, idx):
985 """Set up symlinks to a instance's block device.
987 This is an auxiliary function run when an instance is start (on the primary
988 node) or when an instance is migrated (on the target node).
991 @param instance_name: the name of the target instance
992 @param device_path: path of the physical block device, on the node
993 @param idx: the disk index
994 @return: absolute path to the disk's symlink
997 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
999 os.symlink(device_path, link_name)
1000 except OSError, err:
1001 if err.errno == errno.EEXIST:
1002 if (not os.path.islink(link_name) or
1003 os.readlink(link_name) != device_path):
1004 os.remove(link_name)
1005 os.symlink(device_path, link_name)
1012 def _RemoveBlockDevLinks(instance_name, disks):
1013 """Remove the block device symlinks belonging to the given instance.
1016 for idx, _ in enumerate(disks):
1017 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1018 if os.path.islink(link_name):
1020 os.remove(link_name)
1022 logging.exception("Can't remove symlink '%s'", link_name)
1025 def _GatherAndLinkBlockDevs(instance):
1026 """Set up an instance's block device(s).
1028 This is run on the primary node at instance startup. The block
1029 devices must be already assembled.
1031 @type instance: L{objects.Instance}
1032 @param instance: the instance whose disks we shoul assemble
1034 @return: list of (disk_object, device_path)
1038 for idx, disk in enumerate(instance.disks):
1039 device = _RecursiveFindBD(disk)
1041 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1045 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1047 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1050 block_devices.append((disk, link_name))
1052 return block_devices
1055 def StartInstance(instance):
1056 """Start an instance.
1058 @type instance: L{objects.Instance}
1059 @param instance: the instance object
1063 running_instances = GetInstanceList([instance.hypervisor])
1065 if instance.name in running_instances:
1066 logging.info("Instance %s already running, not starting", instance.name)
1070 block_devices = _GatherAndLinkBlockDevs(instance)
1071 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1072 hyper.StartInstance(instance, block_devices)
1073 except errors.BlockDeviceError, err:
1074 _Fail("Block device error: %s", err, exc=True)
1075 except errors.HypervisorError, err:
1076 _RemoveBlockDevLinks(instance.name, instance.disks)
1077 _Fail("Hypervisor error: %s", err, exc=True)
1080 def InstanceShutdown(instance, timeout):
1081 """Shut an instance down.
1083 @note: this functions uses polling with a hardcoded timeout.
1085 @type instance: L{objects.Instance}
1086 @param instance: the instance object
1087 @type timeout: integer
1088 @param timeout: maximum timeout for soft shutdown
1092 hv_name = instance.hypervisor
1093 hyper = hypervisor.GetHypervisor(hv_name)
1094 iname = instance.name
1096 if instance.name not in hyper.ListInstances():
1097 logging.info("Instance %s not running, doing nothing", iname)
1102 self.tried_once = False
1105 if iname not in hyper.ListInstances():
1109 hyper.StopInstance(instance, retry=self.tried_once)
1110 except errors.HypervisorError, err:
1111 if iname not in hyper.ListInstances():
1112 # if the instance is no longer existing, consider this a
1113 # success and go to cleanup
1116 _Fail("Failed to stop instance %s: %s", iname, err)
1118 self.tried_once = True
1120 raise utils.RetryAgain()
1123 utils.Retry(_TryShutdown(), 5, timeout)
1124 except utils.RetryTimeout:
1125 # the shutdown did not succeed
1126 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1129 hyper.StopInstance(instance, force=True)
1130 except errors.HypervisorError, err:
1131 if iname in hyper.ListInstances():
1132 # only raise an error if the instance still exists, otherwise
1133 # the error could simply be "instance ... unknown"!
1134 _Fail("Failed to force stop instance %s: %s", iname, err)
1138 if iname in hyper.ListInstances():
1139 _Fail("Could not shutdown instance %s even by destroy", iname)
1142 hyper.CleanupInstance(instance.name)
1143 except errors.HypervisorError, err:
1144 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1146 _RemoveBlockDevLinks(iname, instance.disks)
1149 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1150 """Reboot an instance.
1152 @type instance: L{objects.Instance}
1153 @param instance: the instance object to reboot
1154 @type reboot_type: str
1155 @param reboot_type: the type of reboot, one the following
1157 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1158 instance OS, do not recreate the VM
1159 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1160 restart the VM (at the hypervisor level)
1161 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1162 not accepted here, since that mode is handled differently, in
1163 cmdlib, and translates into full stop and start of the
1164 instance (instead of a call_instance_reboot RPC)
1165 @type shutdown_timeout: integer
1166 @param shutdown_timeout: maximum timeout for soft shutdown
1170 running_instances = GetInstanceList([instance.hypervisor])
1172 if instance.name not in running_instances:
1173 _Fail("Cannot reboot instance %s that is not running", instance.name)
1175 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1176 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1178 hyper.RebootInstance(instance)
1179 except errors.HypervisorError, err:
1180 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1181 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1183 InstanceShutdown(instance, shutdown_timeout)
1184 return StartInstance(instance)
1185 except errors.HypervisorError, err:
1186 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1188 _Fail("Invalid reboot_type received: %s", reboot_type)
1191 def MigrationInfo(instance):
1192 """Gather information about an instance to be migrated.
1194 @type instance: L{objects.Instance}
1195 @param instance: the instance definition
1198 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1200 info = hyper.MigrationInfo(instance)
1201 except errors.HypervisorError, err:
1202 _Fail("Failed to fetch migration information: %s", err, exc=True)
1206 def AcceptInstance(instance, info, target):
1207 """Prepare the node to accept an instance.
1209 @type instance: L{objects.Instance}
1210 @param instance: the instance definition
1211 @type info: string/data (opaque)
1212 @param info: migration information, from the source node
1213 @type target: string
1214 @param target: target host (usually ip), on this node
1217 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1219 hyper.AcceptInstance(instance, info, target)
1220 except errors.HypervisorError, err:
1221 _Fail("Failed to accept instance: %s", err, exc=True)
1224 def FinalizeMigration(instance, info, success):
1225 """Finalize any preparation to accept an instance.
1227 @type instance: L{objects.Instance}
1228 @param instance: the instance definition
1229 @type info: string/data (opaque)
1230 @param info: migration information, from the source node
1231 @type success: boolean
1232 @param success: whether the migration was a success or a failure
1235 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1237 hyper.FinalizeMigration(instance, info, success)
1238 except errors.HypervisorError, err:
1239 _Fail("Failed to finalize migration: %s", err, exc=True)
1242 def MigrateInstance(instance, target, live):
1243 """Migrates an instance to another node.
1245 @type instance: L{objects.Instance}
1246 @param instance: the instance definition
1247 @type target: string
1248 @param target: the target node name
1250 @param live: whether the migration should be done live or not (the
1251 interpretation of this parameter is left to the hypervisor)
1253 @return: a tuple of (success, msg) where:
1254 - succes is a boolean denoting the success/failure of the operation
1255 - msg is a string with details in case of failure
1258 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1261 hyper.MigrateInstance(instance, target, live)
1262 except errors.HypervisorError, err:
1263 _Fail("Failed to migrate instance: %s", err, exc=True)
1266 def BlockdevCreate(disk, size, owner, on_primary, info):
1267 """Creates a block device for an instance.
1269 @type disk: L{objects.Disk}
1270 @param disk: the object describing the disk we should create
1272 @param size: the size of the physical underlying device, in MiB
1274 @param owner: the name of the instance for which disk is created,
1275 used for device cache data
1276 @type on_primary: boolean
1277 @param on_primary: indicates if it is the primary node or not
1279 @param info: string that will be sent to the physical device
1280 creation, used for example to set (LVM) tags on LVs
1282 @return: the new unique_id of the device (this can sometime be
1283 computed only after creation), or None. On secondary nodes,
1284 it's not required to return anything.
1287 # TODO: remove the obsolete 'size' argument
1288 # pylint: disable-msg=W0613
1291 for child in disk.children:
1293 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1294 except errors.BlockDeviceError, err:
1295 _Fail("Can't assemble device %s: %s", child, err)
1296 if on_primary or disk.AssembleOnSecondary():
1297 # we need the children open in case the device itself has to
1300 # pylint: disable-msg=E1103
1302 except errors.BlockDeviceError, err:
1303 _Fail("Can't make child '%s' read-write: %s", child, err)
1307 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1308 except errors.BlockDeviceError, err:
1309 _Fail("Can't create block device: %s", err)
1311 if on_primary or disk.AssembleOnSecondary():
1314 except errors.BlockDeviceError, err:
1315 _Fail("Can't assemble device after creation, unusual event: %s", err)
1316 device.SetSyncSpeed(constants.SYNC_SPEED)
1317 if on_primary or disk.OpenOnSecondary():
1319 device.Open(force=True)
1320 except errors.BlockDeviceError, err:
1321 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1322 DevCacheManager.UpdateCache(device.dev_path, owner,
1323 on_primary, disk.iv_name)
1325 device.SetInfo(info)
1327 return device.unique_id
1330 def BlockdevRemove(disk):
1331 """Remove a block device.
1333 @note: This is intended to be called recursively.
1335 @type disk: L{objects.Disk}
1336 @param disk: the disk object we should remove
1338 @return: the success of the operation
1343 rdev = _RecursiveFindBD(disk)
1344 except errors.BlockDeviceError, err:
1345 # probably can't attach
1346 logging.info("Can't attach to device %s in remove", disk)
1348 if rdev is not None:
1349 r_path = rdev.dev_path
1352 except errors.BlockDeviceError, err:
1353 msgs.append(str(err))
1355 DevCacheManager.RemoveCache(r_path)
1358 for child in disk.children:
1360 BlockdevRemove(child)
1361 except RPCFail, err:
1362 msgs.append(str(err))
1365 _Fail("; ".join(msgs))
1368 def _RecursiveAssembleBD(disk, owner, as_primary):
1369 """Activate a block device for an instance.
1371 This is run on the primary and secondary nodes for an instance.
1373 @note: this function is called recursively.
1375 @type disk: L{objects.Disk}
1376 @param disk: the disk we try to assemble
1378 @param owner: the name of the instance which owns the disk
1379 @type as_primary: boolean
1380 @param as_primary: if we should make the block device
1383 @return: the assembled device or None (in case no device
1385 @raise errors.BlockDeviceError: in case there is an error
1386 during the activation of the children or the device
1392 mcn = disk.ChildrenNeeded()
1394 mcn = 0 # max number of Nones allowed
1396 mcn = len(disk.children) - mcn # max number of Nones
1397 for chld_disk in disk.children:
1399 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1400 except errors.BlockDeviceError, err:
1401 if children.count(None) >= mcn:
1404 logging.error("Error in child activation (but continuing): %s",
1406 children.append(cdev)
1408 if as_primary or disk.AssembleOnSecondary():
1409 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1410 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1412 if as_primary or disk.OpenOnSecondary():
1414 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1415 as_primary, disk.iv_name)
1422 def BlockdevAssemble(disk, owner, as_primary):
1423 """Activate a block device for an instance.
1425 This is a wrapper over _RecursiveAssembleBD.
1427 @rtype: str or boolean
1428 @return: a C{/dev/...} path for primary nodes, and
1429 C{True} for secondary nodes
1433 result = _RecursiveAssembleBD(disk, owner, as_primary)
1434 if isinstance(result, bdev.BlockDev):
1435 # pylint: disable-msg=E1103
1436 result = result.dev_path
1437 except errors.BlockDeviceError, err:
1438 _Fail("Error while assembling disk: %s", err, exc=True)
1443 def BlockdevShutdown(disk):
1444 """Shut down a block device.
1446 First, if the device is assembled (Attach() is successful), then
1447 the device is shutdown. Then the children of the device are
1450 This function is called recursively. Note that we don't cache the
1451 children or such, as oppossed to assemble, shutdown of different
1452 devices doesn't require that the upper device was active.
1454 @type disk: L{objects.Disk}
1455 @param disk: the description of the disk we should
1461 r_dev = _RecursiveFindBD(disk)
1462 if r_dev is not None:
1463 r_path = r_dev.dev_path
1466 DevCacheManager.RemoveCache(r_path)
1467 except errors.BlockDeviceError, err:
1468 msgs.append(str(err))
1471 for child in disk.children:
1473 BlockdevShutdown(child)
1474 except RPCFail, err:
1475 msgs.append(str(err))
1478 _Fail("; ".join(msgs))
1481 def BlockdevAddchildren(parent_cdev, new_cdevs):
1482 """Extend a mirrored block device.
1484 @type parent_cdev: L{objects.Disk}
1485 @param parent_cdev: the disk to which we should add children
1486 @type new_cdevs: list of L{objects.Disk}
1487 @param new_cdevs: the list of children which we should add
1491 parent_bdev = _RecursiveFindBD(parent_cdev)
1492 if parent_bdev is None:
1493 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1494 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1495 if new_bdevs.count(None) > 0:
1496 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1497 parent_bdev.AddChildren(new_bdevs)
1500 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1501 """Shrink a mirrored block device.
1503 @type parent_cdev: L{objects.Disk}
1504 @param parent_cdev: the disk from which we should remove children
1505 @type new_cdevs: list of L{objects.Disk}
1506 @param new_cdevs: the list of children which we should remove
1510 parent_bdev = _RecursiveFindBD(parent_cdev)
1511 if parent_bdev is None:
1512 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1514 for disk in new_cdevs:
1515 rpath = disk.StaticDevPath()
1517 bd = _RecursiveFindBD(disk)
1519 _Fail("Can't find device %s while removing children", disk)
1521 devs.append(bd.dev_path)
1523 if not utils.IsNormAbsPath(rpath):
1524 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1526 parent_bdev.RemoveChildren(devs)
1529 def BlockdevGetmirrorstatus(disks):
1530 """Get the mirroring status of a list of devices.
1532 @type disks: list of L{objects.Disk}
1533 @param disks: the list of disks which we should query
1536 a list of (mirror_done, estimated_time) tuples, which
1537 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1538 @raise errors.BlockDeviceError: if any of the disks cannot be
1544 rbd = _RecursiveFindBD(dsk)
1546 _Fail("Can't find device %s", dsk)
1548 stats.append(rbd.CombinedSyncStatus())
1553 def _RecursiveFindBD(disk):
1554 """Check if a device is activated.
1556 If so, return information about the real device.
1558 @type disk: L{objects.Disk}
1559 @param disk: the disk object we need to find
1561 @return: None if the device can't be found,
1562 otherwise the device instance
1567 for chdisk in disk.children:
1568 children.append(_RecursiveFindBD(chdisk))
1570 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1573 def _OpenRealBD(disk):
1574 """Opens the underlying block device of a disk.
1576 @type disk: L{objects.Disk}
1577 @param disk: the disk object we want to open
1580 real_disk = _RecursiveFindBD(disk)
1581 if real_disk is None:
1582 _Fail("Block device '%s' is not set up", disk)
1589 def BlockdevFind(disk):
1590 """Check if a device is activated.
1592 If it is, return information about the real device.
1594 @type disk: L{objects.Disk}
1595 @param disk: the disk to find
1596 @rtype: None or objects.BlockDevStatus
1597 @return: None if the disk cannot be found, otherwise a the current
1602 rbd = _RecursiveFindBD(disk)
1603 except errors.BlockDeviceError, err:
1604 _Fail("Failed to find device: %s", err, exc=True)
1609 return rbd.GetSyncStatus()
1612 def BlockdevGetsize(disks):
1613 """Computes the size of the given disks.
1615 If a disk is not found, returns None instead.
1617 @type disks: list of L{objects.Disk}
1618 @param disks: the list of disk to compute the size for
1620 @return: list with elements None if the disk cannot be found,
1627 rbd = _RecursiveFindBD(cf)
1628 except errors.BlockDeviceError:
1634 result.append(rbd.GetActualSize())
1638 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1639 """Export a block device to a remote node.
1641 @type disk: L{objects.Disk}
1642 @param disk: the description of the disk to export
1643 @type dest_node: str
1644 @param dest_node: the destination node to export to
1645 @type dest_path: str
1646 @param dest_path: the destination path on the target node
1647 @type cluster_name: str
1648 @param cluster_name: the cluster name, needed for SSH hostalias
1652 real_disk = _OpenRealBD(disk)
1654 # the block size on the read dd is 1MiB to match our units
1655 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1656 "dd if=%s bs=1048576 count=%s",
1657 real_disk.dev_path, str(disk.size))
1659 # we set here a smaller block size as, due to ssh buffering, more
1660 # than 64-128k will mostly ignored; we use nocreat to fail if the
1661 # device is not already there or we pass a wrong path; we use
1662 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1663 # to not buffer too much memory; this means that at best, we flush
1664 # every 64k, which will not be very fast
1665 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1666 " oflag=dsync", dest_path)
1668 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1669 constants.GANETI_RUNAS,
1672 # all commands have been checked, so we're safe to combine them
1673 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1675 result = utils.RunCmd(["bash", "-c", command])
1678 _Fail("Disk copy command '%s' returned error: %s"
1679 " output: %s", command, result.fail_reason, result.output)
1682 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1683 """Write a file to the filesystem.
1685 This allows the master to overwrite(!) a file. It will only perform
1686 the operation if the file belongs to a list of configuration files.
1688 @type file_name: str
1689 @param file_name: the target file name
1691 @param data: the new contents of the file
1693 @param mode: the mode to give the file (can be None)
1695 @param uid: the owner of the file (can be -1 for default)
1697 @param gid: the group of the file (can be -1 for default)
1699 @param atime: the atime to set on the file (can be None)
1701 @param mtime: the mtime to set on the file (can be None)
1705 if not os.path.isabs(file_name):
1706 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1708 if file_name not in _ALLOWED_UPLOAD_FILES:
1709 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1712 raw_data = _Decompress(data)
1714 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1715 atime=atime, mtime=mtime)
1718 def WriteSsconfFiles(values):
1719 """Update all ssconf files.
1721 Wrapper around the SimpleStore.WriteFiles.
1724 ssconf.SimpleStore().WriteFiles(values)
1727 def _ErrnoOrStr(err):
1728 """Format an EnvironmentError exception.
1730 If the L{err} argument has an errno attribute, it will be looked up
1731 and converted into a textual C{E...} description. Otherwise the
1732 string representation of the error will be returned.
1734 @type err: L{EnvironmentError}
1735 @param err: the exception to format
1738 if hasattr(err, 'errno'):
1739 detail = errno.errorcode[err.errno]
1745 def _OSOndiskAPIVersion(os_dir):
1746 """Compute and return the API version of a given OS.
1748 This function will try to read the API version of the OS residing in
1749 the 'os_dir' directory.
1752 @param os_dir: the directory in which we should look for the OS
1754 @return: tuple (status, data) with status denoting the validity and
1755 data holding either the vaid versions or an error message
1758 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1761 st = os.stat(api_file)
1762 except EnvironmentError, err:
1763 return False, ("Required file '%s' not found under path %s: %s" %
1764 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1766 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1767 return False, ("File '%s' in %s is not a regular file" %
1768 (constants.OS_API_FILE, os_dir))
1771 api_versions = utils.ReadFile(api_file).splitlines()
1772 except EnvironmentError, err:
1773 return False, ("Error while reading the API version file at %s: %s" %
1774 (api_file, _ErrnoOrStr(err)))
1777 api_versions = [int(version.strip()) for version in api_versions]
1778 except (TypeError, ValueError), err:
1779 return False, ("API version(s) can't be converted to integer: %s" %
1782 return True, api_versions
1785 def DiagnoseOS(top_dirs=None):
1786 """Compute the validity for all OSes.
1788 @type top_dirs: list
1789 @param top_dirs: the list of directories in which to
1790 search (if not given defaults to
1791 L{constants.OS_SEARCH_PATH})
1792 @rtype: list of L{objects.OS}
1793 @return: a list of tuples (name, path, status, diagnose, variants,
1794 parameters, api_version) for all (potential) OSes under all
1795 search paths, where:
1796 - name is the (potential) OS name
1797 - path is the full path to the OS
1798 - status True/False is the validity of the OS
1799 - diagnose is the error message for an invalid OS, otherwise empty
1800 - variants is a list of supported OS variants, if any
1801 - parameters is a list of (name, help) parameters, if any
1802 - api_version is a list of support OS API versions
1805 if top_dirs is None:
1806 top_dirs = constants.OS_SEARCH_PATH
1809 for dir_name in top_dirs:
1810 if os.path.isdir(dir_name):
1812 f_names = utils.ListVisibleFiles(dir_name)
1813 except EnvironmentError, err:
1814 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1816 for name in f_names:
1817 os_path = utils.PathJoin(dir_name, name)
1818 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1821 variants = os_inst.supported_variants
1822 parameters = os_inst.supported_parameters
1823 api_versions = os_inst.api_versions
1826 variants = parameters = api_versions = []
1827 result.append((name, os_path, status, diagnose, variants,
1828 parameters, api_versions))
1833 def _TryOSFromDisk(name, base_dir=None):
1834 """Create an OS instance from disk.
1836 This function will return an OS instance if the given name is a
1839 @type base_dir: string
1840 @keyword base_dir: Base directory containing OS installations.
1841 Defaults to a search in all the OS_SEARCH_PATH dirs.
1843 @return: success and either the OS instance if we find a valid one,
1847 if base_dir is None:
1848 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1850 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1853 return False, "Directory for OS %s not found in search path" % name
1855 status, api_versions = _OSOndiskAPIVersion(os_dir)
1858 return status, api_versions
1860 if not constants.OS_API_VERSIONS.intersection(api_versions):
1861 return False, ("API version mismatch for path '%s': found %s, want %s." %
1862 (os_dir, api_versions, constants.OS_API_VERSIONS))
1864 # OS Files dictionary, we will populate it with the absolute path names
1865 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1867 if max(api_versions) >= constants.OS_API_V15:
1868 os_files[constants.OS_VARIANTS_FILE] = ''
1870 if max(api_versions) >= constants.OS_API_V20:
1871 os_files[constants.OS_PARAMETERS_FILE] = ''
1873 del os_files[constants.OS_SCRIPT_VERIFY]
1875 for filename in os_files:
1876 os_files[filename] = utils.PathJoin(os_dir, filename)
1879 st = os.stat(os_files[filename])
1880 except EnvironmentError, err:
1881 return False, ("File '%s' under path '%s' is missing (%s)" %
1882 (filename, os_dir, _ErrnoOrStr(err)))
1884 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1885 return False, ("File '%s' under path '%s' is not a regular file" %
1888 if filename in constants.OS_SCRIPTS:
1889 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1890 return False, ("File '%s' under path '%s' is not executable" %
1894 if constants.OS_VARIANTS_FILE in os_files:
1895 variants_file = os_files[constants.OS_VARIANTS_FILE]
1897 variants = utils.ReadFile(variants_file).splitlines()
1898 except EnvironmentError, err:
1899 return False, ("Error while reading the OS variants file at %s: %s" %
1900 (variants_file, _ErrnoOrStr(err)))
1902 return False, ("No supported os variant found")
1905 if constants.OS_PARAMETERS_FILE in os_files:
1906 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1908 parameters = utils.ReadFile(parameters_file).splitlines()
1909 except EnvironmentError, err:
1910 return False, ("Error while reading the OS parameters file at %s: %s" %
1911 (parameters_file, _ErrnoOrStr(err)))
1912 parameters = [v.split(None, 1) for v in parameters]
1914 os_obj = objects.OS(name=name, path=os_dir,
1915 create_script=os_files[constants.OS_SCRIPT_CREATE],
1916 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1917 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1918 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1919 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1921 supported_variants=variants,
1922 supported_parameters=parameters,
1923 api_versions=api_versions)
1927 def OSFromDisk(name, base_dir=None):
1928 """Create an OS instance from disk.
1930 This function will return an OS instance if the given name is a
1931 valid OS name. Otherwise, it will raise an appropriate
1932 L{RPCFail} exception, detailing why this is not a valid OS.
1934 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1935 an exception but returns true/false status data.
1937 @type base_dir: string
1938 @keyword base_dir: Base directory containing OS installations.
1939 Defaults to a search in all the OS_SEARCH_PATH dirs.
1940 @rtype: L{objects.OS}
1941 @return: the OS instance if we find a valid one
1942 @raise RPCFail: if we don't find a valid OS
1945 name_only = name.split("+", 1)[0]
1946 status, payload = _TryOSFromDisk(name_only, base_dir)
1954 def OSCoreEnv(inst_os, os_params, debug=0):
1955 """Calculate the basic environment for an os script.
1957 @type inst_os: L{objects.OS}
1958 @param inst_os: operating system for which the environment is being built
1959 @type os_params: dict
1960 @param os_params: the OS parameters
1961 @type debug: integer
1962 @param debug: debug level (0 or 1, for OS Api 10)
1964 @return: dict of environment variables
1965 @raise errors.BlockDeviceError: if the block device
1971 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1972 result['OS_API_VERSION'] = '%d' % api_version
1973 result['OS_NAME'] = inst_os.name
1974 result['DEBUG_LEVEL'] = '%d' % debug
1977 if api_version >= constants.OS_API_V15:
1979 variant = inst_os.name.split('+', 1)[1]
1981 variant = inst_os.supported_variants[0]
1982 result['OS_VARIANT'] = variant
1985 for pname, pvalue in os_params.items():
1986 result['OSP_%s' % pname.upper()] = pvalue
1991 def OSEnvironment(instance, inst_os, debug=0):
1992 """Calculate the environment for an os script.
1994 @type instance: L{objects.Instance}
1995 @param instance: target instance for the os script run
1996 @type inst_os: L{objects.OS}
1997 @param inst_os: operating system for which the environment is being built
1998 @type debug: integer
1999 @param debug: debug level (0 or 1, for OS Api 10)
2001 @return: dict of environment variables
2002 @raise errors.BlockDeviceError: if the block device
2006 result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
2008 result['INSTANCE_NAME'] = instance.name
2009 result['INSTANCE_OS'] = instance.os
2010 result['HYPERVISOR'] = instance.hypervisor
2011 result['DISK_COUNT'] = '%d' % len(instance.disks)
2012 result['NIC_COUNT'] = '%d' % len(instance.nics)
2015 for idx, disk in enumerate(instance.disks):
2016 real_disk = _OpenRealBD(disk)
2017 result['DISK_%d_PATH' % idx] = real_disk.dev_path
2018 result['DISK_%d_ACCESS' % idx] = disk.mode
2019 if constants.HV_DISK_TYPE in instance.hvparams:
2020 result['DISK_%d_FRONTEND_TYPE' % idx] = \
2021 instance.hvparams[constants.HV_DISK_TYPE]
2022 if disk.dev_type in constants.LDS_BLOCK:
2023 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2024 elif disk.dev_type == constants.LD_FILE:
2025 result['DISK_%d_BACKEND_TYPE' % idx] = \
2026 'file:%s' % disk.physical_id[0]
2029 for idx, nic in enumerate(instance.nics):
2030 result['NIC_%d_MAC' % idx] = nic.mac
2032 result['NIC_%d_IP' % idx] = nic.ip
2033 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2034 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2035 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2036 if nic.nicparams[constants.NIC_LINK]:
2037 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2038 if constants.HV_NIC_TYPE in instance.hvparams:
2039 result['NIC_%d_FRONTEND_TYPE' % idx] = \
2040 instance.hvparams[constants.HV_NIC_TYPE]
2043 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2044 for key, value in source.items():
2045 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2050 def BlockdevGrow(disk, amount):
2051 """Grow a stack of block devices.
2053 This function is called recursively, with the childrens being the
2054 first ones to resize.
2056 @type disk: L{objects.Disk}
2057 @param disk: the disk to be grown
2058 @rtype: (status, result)
2059 @return: a tuple with the status of the operation
2060 (True/False), and the errors message if status
2064 r_dev = _RecursiveFindBD(disk)
2066 _Fail("Cannot find block device %s", disk)
2070 except errors.BlockDeviceError, err:
2071 _Fail("Failed to grow block device: %s", err, exc=True)
2074 def BlockdevSnapshot(disk):
2075 """Create a snapshot copy of a block device.
2077 This function is called recursively, and the snapshot is actually created
2078 just for the leaf lvm backend device.
2080 @type disk: L{objects.Disk}
2081 @param disk: the disk to be snapshotted
2083 @return: snapshot disk path
2086 if disk.dev_type == constants.LD_DRBD8:
2087 if not disk.children:
2088 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2090 return BlockdevSnapshot(disk.children[0])
2091 elif disk.dev_type == constants.LD_LV:
2092 r_dev = _RecursiveFindBD(disk)
2093 if r_dev is not None:
2094 # FIXME: choose a saner value for the snapshot size
2095 # let's stay on the safe side and ask for the full size, for now
2096 return r_dev.Snapshot(disk.size)
2098 _Fail("Cannot find block device %s", disk)
2100 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2101 disk.unique_id, disk.dev_type)
2104 def FinalizeExport(instance, snap_disks):
2105 """Write out the export configuration information.
2107 @type instance: L{objects.Instance}
2108 @param instance: the instance which we export, used for
2109 saving configuration
2110 @type snap_disks: list of L{objects.Disk}
2111 @param snap_disks: list of snapshot block devices, which
2112 will be used to get the actual name of the dump file
2117 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2118 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2120 config = objects.SerializableConfigParser()
2122 config.add_section(constants.INISECT_EXP)
2123 config.set(constants.INISECT_EXP, 'version', '0')
2124 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2125 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2126 config.set(constants.INISECT_EXP, 'os', instance.os)
2127 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2129 config.add_section(constants.INISECT_INS)
2130 config.set(constants.INISECT_INS, 'name', instance.name)
2131 config.set(constants.INISECT_INS, 'memory', '%d' %
2132 instance.beparams[constants.BE_MEMORY])
2133 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2134 instance.beparams[constants.BE_VCPUS])
2135 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2136 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2139 for nic_count, nic in enumerate(instance.nics):
2141 config.set(constants.INISECT_INS, 'nic%d_mac' %
2142 nic_count, '%s' % nic.mac)
2143 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2144 for param in constants.NICS_PARAMETER_TYPES:
2145 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2146 '%s' % nic.nicparams.get(param, None))
2147 # TODO: redundant: on load can read nics until it doesn't exist
2148 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2151 for disk_count, disk in enumerate(snap_disks):
2154 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2155 ('%s' % disk.iv_name))
2156 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2157 ('%s' % disk.physical_id[1]))
2158 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2161 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2163 # New-style hypervisor/backend parameters
2165 config.add_section(constants.INISECT_HYP)
2166 for name, value in instance.hvparams.items():
2167 if name not in constants.HVC_GLOBALS:
2168 config.set(constants.INISECT_HYP, name, str(value))
2170 config.add_section(constants.INISECT_BEP)
2171 for name, value in instance.beparams.items():
2172 config.set(constants.INISECT_BEP, name, str(value))
2174 config.add_section(constants.INISECT_OSP)
2175 for name, value in instance.osparams.items():
2176 config.set(constants.INISECT_OSP, name, str(value))
2178 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2179 data=config.Dumps())
2180 shutil.rmtree(finaldestdir, ignore_errors=True)
2181 shutil.move(destdir, finaldestdir)
2184 def ExportInfo(dest):
2185 """Get export configuration information.
2188 @param dest: directory containing the export
2190 @rtype: L{objects.SerializableConfigParser}
2191 @return: a serializable config file containing the
2195 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2197 config = objects.SerializableConfigParser()
2200 if (not config.has_section(constants.INISECT_EXP) or
2201 not config.has_section(constants.INISECT_INS)):
2202 _Fail("Export info file doesn't have the required fields")
2204 return config.Dumps()
2208 """Return a list of exports currently available on this machine.
2211 @return: list of the exports
2214 if os.path.isdir(constants.EXPORT_DIR):
2215 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2217 _Fail("No exports directory")
2220 def RemoveExport(export):
2221 """Remove an existing export from the node.
2224 @param export: the name of the export to remove
2228 target = utils.PathJoin(constants.EXPORT_DIR, export)
2231 shutil.rmtree(target)
2232 except EnvironmentError, err:
2233 _Fail("Error while removing the export: %s", err, exc=True)
2236 def BlockdevRename(devlist):
2237 """Rename a list of block devices.
2239 @type devlist: list of tuples
2240 @param devlist: list of tuples of the form (disk,
2241 new_logical_id, new_physical_id); disk is an
2242 L{objects.Disk} object describing the current disk,
2243 and new logical_id/physical_id is the name we
2246 @return: True if all renames succeeded, False otherwise
2251 for disk, unique_id in devlist:
2252 dev = _RecursiveFindBD(disk)
2254 msgs.append("Can't find device %s in rename" % str(disk))
2258 old_rpath = dev.dev_path
2259 dev.Rename(unique_id)
2260 new_rpath = dev.dev_path
2261 if old_rpath != new_rpath:
2262 DevCacheManager.RemoveCache(old_rpath)
2263 # FIXME: we should add the new cache information here, like:
2264 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2265 # but we don't have the owner here - maybe parse from existing
2266 # cache? for now, we only lose lvm data when we rename, which
2267 # is less critical than DRBD or MD
2268 except errors.BlockDeviceError, err:
2269 msgs.append("Can't rename device '%s' to '%s': %s" %
2270 (dev, unique_id, err))
2271 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2274 _Fail("; ".join(msgs))
2277 def _TransformFileStorageDir(file_storage_dir):
2278 """Checks whether given file_storage_dir is valid.
2280 Checks wheter the given file_storage_dir is within the cluster-wide
2281 default file_storage_dir stored in SimpleStore. Only paths under that
2282 directory are allowed.
2284 @type file_storage_dir: str
2285 @param file_storage_dir: the path to check
2287 @return: the normalized path if valid, None otherwise
2290 if not constants.ENABLE_FILE_STORAGE:
2291 _Fail("File storage disabled at configure time")
2293 file_storage_dir = os.path.normpath(file_storage_dir)
2294 base_file_storage_dir = cfg.GetFileStorageDir()
2295 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2296 base_file_storage_dir):
2297 _Fail("File storage directory '%s' is not under base file"
2298 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2299 return file_storage_dir
2302 def CreateFileStorageDir(file_storage_dir):
2303 """Create file storage directory.
2305 @type file_storage_dir: str
2306 @param file_storage_dir: directory to create
2309 @return: tuple with first element a boolean indicating wheter dir
2310 creation was successful or not
2313 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2314 if os.path.exists(file_storage_dir):
2315 if not os.path.isdir(file_storage_dir):
2316 _Fail("Specified storage dir '%s' is not a directory",
2320 os.makedirs(file_storage_dir, 0750)
2321 except OSError, err:
2322 _Fail("Cannot create file storage directory '%s': %s",
2323 file_storage_dir, err, exc=True)
2326 def RemoveFileStorageDir(file_storage_dir):
2327 """Remove file storage directory.
2329 Remove it only if it's empty. If not log an error and return.
2331 @type file_storage_dir: str
2332 @param file_storage_dir: the directory we should cleanup
2333 @rtype: tuple (success,)
2334 @return: tuple of one element, C{success}, denoting
2335 whether the operation was successful
2338 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2339 if os.path.exists(file_storage_dir):
2340 if not os.path.isdir(file_storage_dir):
2341 _Fail("Specified Storage directory '%s' is not a directory",
2343 # deletes dir only if empty, otherwise we want to fail the rpc call
2345 os.rmdir(file_storage_dir)
2346 except OSError, err:
2347 _Fail("Cannot remove file storage directory '%s': %s",
2348 file_storage_dir, err)
2351 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2352 """Rename the file storage directory.
2354 @type old_file_storage_dir: str
2355 @param old_file_storage_dir: the current path
2356 @type new_file_storage_dir: str
2357 @param new_file_storage_dir: the name we should rename to
2358 @rtype: tuple (success,)
2359 @return: tuple of one element, C{success}, denoting
2360 whether the operation was successful
2363 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2364 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2365 if not os.path.exists(new_file_storage_dir):
2366 if os.path.isdir(old_file_storage_dir):
2368 os.rename(old_file_storage_dir, new_file_storage_dir)
2369 except OSError, err:
2370 _Fail("Cannot rename '%s' to '%s': %s",
2371 old_file_storage_dir, new_file_storage_dir, err)
2373 _Fail("Specified storage dir '%s' is not a directory",
2374 old_file_storage_dir)
2376 if os.path.exists(old_file_storage_dir):
2377 _Fail("Cannot rename '%s' to '%s': both locations exist",
2378 old_file_storage_dir, new_file_storage_dir)
2381 def _EnsureJobQueueFile(file_name):
2382 """Checks whether the given filename is in the queue directory.
2384 @type file_name: str
2385 @param file_name: the file name we should check
2387 @raises RPCFail: if the file is not valid
2390 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2391 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2394 _Fail("Passed job queue file '%s' does not belong to"
2395 " the queue directory '%s'", file_name, queue_dir)
2398 def JobQueueUpdate(file_name, content):
2399 """Updates a file in the queue directory.
2401 This is just a wrapper over L{utils.WriteFile}, with proper
2404 @type file_name: str
2405 @param file_name: the job file name
2407 @param content: the new job contents
2409 @return: the success of the operation
2412 _EnsureJobQueueFile(file_name)
2414 # Write and replace the file atomically
2415 utils.WriteFile(file_name, data=_Decompress(content))
2418 def JobQueueRename(old, new):
2419 """Renames a job queue file.
2421 This is just a wrapper over os.rename with proper checking.
2424 @param old: the old (actual) file name
2426 @param new: the desired file name
2428 @return: the success of the operation and payload
2431 _EnsureJobQueueFile(old)
2432 _EnsureJobQueueFile(new)
2434 utils.RenameFile(old, new, mkdir=True)
2437 def BlockdevClose(instance_name, disks):
2438 """Closes the given block devices.
2440 This means they will be switched to secondary mode (in case of
2443 @param instance_name: if the argument is not empty, the symlinks
2444 of this instance will be removed
2445 @type disks: list of L{objects.Disk}
2446 @param disks: the list of disks to be closed
2447 @rtype: tuple (success, message)
2448 @return: a tuple of success and message, where success
2449 indicates the succes of the operation, and message
2450 which will contain the error details in case we
2456 rd = _RecursiveFindBD(cf)
2458 _Fail("Can't find device %s", cf)
2465 except errors.BlockDeviceError, err:
2466 msg.append(str(err))
2468 _Fail("Can't make devices secondary: %s", ",".join(msg))
2471 _RemoveBlockDevLinks(instance_name, disks)
2474 def ValidateHVParams(hvname, hvparams):
2475 """Validates the given hypervisor parameters.
2477 @type hvname: string
2478 @param hvname: the hypervisor name
2479 @type hvparams: dict
2480 @param hvparams: the hypervisor parameters to be validated
2485 hv_type = hypervisor.GetHypervisor(hvname)
2486 hv_type.ValidateParameters(hvparams)
2487 except errors.HypervisorError, err:
2488 _Fail(str(err), log=False)
2491 def _CheckOSPList(os_obj, parameters):
2492 """Check whether a list of parameters is supported by the OS.
2494 @type os_obj: L{objects.OS}
2495 @param os_obj: OS object to check
2496 @type parameters: list
2497 @param parameters: the list of parameters to check
2500 supported = [v[0] for v in os_obj.supported_parameters]
2501 delta = frozenset(parameters).difference(supported)
2503 _Fail("The following parameters are not supported"
2504 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2507 def ValidateOS(required, osname, checks, osparams):
2508 """Validate the given OS' parameters.
2510 @type required: boolean
2511 @param required: whether absence of the OS should translate into
2513 @type osname: string
2514 @param osname: the OS to be validated
2516 @param checks: list of the checks to run (currently only 'parameters')
2517 @type osparams: dict
2518 @param osparams: dictionary with OS parameters
2520 @return: True if the validation passed, or False if the OS was not
2521 found and L{required} was false
2524 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2525 _Fail("Unknown checks required for OS %s: %s", osname,
2526 set(checks).difference(constants.OS_VALIDATE_CALLS))
2528 name_only = osname.split("+", 1)[0]
2529 status, tbv = _TryOSFromDisk(name_only, None)
2537 if max(tbv.api_versions) < constants.OS_API_V20:
2540 if constants.OS_VALIDATE_PARAMETERS in checks:
2541 _CheckOSPList(tbv, osparams.keys())
2543 validate_env = OSCoreEnv(tbv, osparams)
2544 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2547 logging.error("os validate command '%s' returned error: %s output: %s",
2548 result.cmd, result.fail_reason, result.output)
2549 _Fail("OS validation script failed (%s), output: %s",
2550 result.fail_reason, result.output, log=False)
2556 """Demotes the current node from master candidate role.
2559 # try to ensure we're not the master by mistake
2560 master, myself = ssconf.GetMasterAndMyself()
2561 if master == myself:
2562 _Fail("ssconf status shows I'm the master node, will not demote")
2564 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2565 if not result.failed:
2566 _Fail("The master daemon is running, will not demote")
2569 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2570 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2571 except EnvironmentError, err:
2572 if err.errno != errno.ENOENT:
2573 _Fail("Error while backing up cluster file: %s", err, exc=True)
2575 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2578 def _GetX509Filenames(cryptodir, name):
2579 """Returns the full paths for the private key and certificate.
2582 return (utils.PathJoin(cryptodir, name),
2583 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2584 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2587 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2588 """Creates a new X509 certificate for SSL/TLS.
2591 @param validity: Validity in seconds
2592 @rtype: tuple; (string, string)
2593 @return: Certificate name and public part
2596 (key_pem, cert_pem) = \
2597 utils.GenerateSelfSignedX509Cert(netutils.HostInfo.SysName(),
2598 min(validity, _MAX_SSL_CERT_VALIDITY))
2600 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2601 prefix="x509-%s-" % utils.TimestampForFilename())
2603 name = os.path.basename(cert_dir)
2604 assert len(name) > 5
2606 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2608 utils.WriteFile(key_file, mode=0400, data=key_pem)
2609 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2611 # Never return private key as it shouldn't leave the node
2612 return (name, cert_pem)
2614 shutil.rmtree(cert_dir, ignore_errors=True)
2618 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2619 """Removes a X509 certificate.
2622 @param name: Certificate name
2625 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2627 utils.RemoveFile(key_file)
2628 utils.RemoveFile(cert_file)
2632 except EnvironmentError, err:
2633 _Fail("Cannot remove certificate directory '%s': %s",
2637 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2638 """Returns the command for the requested input/output.
2640 @type instance: L{objects.Instance}
2641 @param instance: The instance object
2642 @param mode: Import/export mode
2643 @param ieio: Input/output type
2644 @param ieargs: Input/output arguments
2647 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2654 if ieio == constants.IEIO_FILE:
2655 (filename, ) = ieargs
2657 if not utils.IsNormAbsPath(filename):
2658 _Fail("Path '%s' is not normalized or absolute", filename)
2660 directory = os.path.normpath(os.path.dirname(filename))
2662 if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2663 constants.EXPORT_DIR):
2664 _Fail("File '%s' is not under exports directory '%s'",
2665 filename, constants.EXPORT_DIR)
2668 utils.Makedirs(directory, mode=0750)
2670 quoted_filename = utils.ShellQuote(filename)
2672 if mode == constants.IEM_IMPORT:
2673 suffix = "> %s" % quoted_filename
2674 elif mode == constants.IEM_EXPORT:
2675 suffix = "< %s" % quoted_filename
2677 # Retrieve file size
2679 st = os.stat(filename)
2680 except EnvironmentError, err:
2681 logging.error("Can't stat(2) %s: %s", filename, err)
2683 exp_size = utils.BytesToMebibyte(st.st_size)
2685 elif ieio == constants.IEIO_RAW_DISK:
2688 real_disk = _OpenRealBD(disk)
2690 if mode == constants.IEM_IMPORT:
2691 # we set here a smaller block size as, due to transport buffering, more
2692 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2693 # is not already there or we pass a wrong path; we use notrunc to no
2694 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2695 # much memory; this means that at best, we flush every 64k, which will
2697 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2698 " bs=%s oflag=dsync"),
2702 elif mode == constants.IEM_EXPORT:
2703 # the block size on the read dd is 1MiB to match our units
2704 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2706 str(1024 * 1024), # 1 MB
2708 exp_size = disk.size
2710 elif ieio == constants.IEIO_SCRIPT:
2711 (disk, disk_index, ) = ieargs
2713 assert isinstance(disk_index, (int, long))
2715 real_disk = _OpenRealBD(disk)
2717 inst_os = OSFromDisk(instance.os)
2718 env = OSEnvironment(instance, inst_os)
2720 if mode == constants.IEM_IMPORT:
2721 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2722 env["IMPORT_INDEX"] = str(disk_index)
2723 script = inst_os.import_script
2725 elif mode == constants.IEM_EXPORT:
2726 env["EXPORT_DEVICE"] = real_disk.dev_path
2727 env["EXPORT_INDEX"] = str(disk_index)
2728 script = inst_os.export_script
2730 # TODO: Pass special environment only to script
2731 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2733 if mode == constants.IEM_IMPORT:
2734 suffix = "| %s" % script_cmd
2736 elif mode == constants.IEM_EXPORT:
2737 prefix = "%s |" % script_cmd
2739 # Let script predict size
2740 exp_size = constants.IE_CUSTOM_SIZE
2743 _Fail("Invalid %s I/O mode %r", mode, ieio)
2745 return (env, prefix, suffix, exp_size)
2748 def _CreateImportExportStatusDir(prefix):
2749 """Creates status directory for import/export.
2752 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2754 (prefix, utils.TimestampForFilename())))
2757 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2758 """Starts an import or export daemon.
2760 @param mode: Import/output mode
2761 @type opts: L{objects.ImportExportOptions}
2762 @param opts: Daemon options
2764 @param host: Remote host for export (None for import)
2766 @param port: Remote port for export (None for import)
2767 @type instance: L{objects.Instance}
2768 @param instance: Instance object
2769 @param ieio: Input/output type
2770 @param ieioargs: Input/output arguments
2773 if mode == constants.IEM_IMPORT:
2776 if not (host is None and port is None):
2777 _Fail("Can not specify host or port on import")
2779 elif mode == constants.IEM_EXPORT:
2782 if host is None or port is None:
2783 _Fail("Host and port must be specified for an export")
2786 _Fail("Invalid mode %r", mode)
2788 if (opts.key_name is None) ^ (opts.ca_pem is None):
2789 _Fail("Cluster certificate can only be used for both key and CA")
2791 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2792 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2794 if opts.key_name is None:
2796 key_path = constants.NODED_CERT_FILE
2797 cert_path = constants.NODED_CERT_FILE
2798 assert opts.ca_pem is None
2800 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2802 assert opts.ca_pem is not None
2804 for i in [key_path, cert_path]:
2805 if not os.path.exists(i):
2806 _Fail("File '%s' does not exist" % i)
2808 status_dir = _CreateImportExportStatusDir(prefix)
2810 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2811 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2812 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2814 if opts.ca_pem is None:
2816 ca = utils.ReadFile(constants.NODED_CERT_FILE)
2821 utils.WriteFile(ca_file, data=ca, mode=0400)
2824 constants.IMPORT_EXPORT_DAEMON,
2826 "--key=%s" % key_path,
2827 "--cert=%s" % cert_path,
2828 "--ca=%s" % ca_file,
2832 cmd.append("--host=%s" % host)
2835 cmd.append("--port=%s" % port)
2838 cmd.append("--compress=%s" % opts.compress)
2841 cmd.append("--magic=%s" % opts.magic)
2843 if exp_size is not None:
2844 cmd.append("--expected-size=%s" % exp_size)
2847 cmd.append("--cmd-prefix=%s" % cmd_prefix)
2850 cmd.append("--cmd-suffix=%s" % cmd_suffix)
2852 logfile = _InstanceLogName(prefix, instance.os, instance.name)
2854 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2855 # support for receiving a file descriptor for output
2856 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2859 # The import/export name is simply the status directory name
2860 return os.path.basename(status_dir)
2863 shutil.rmtree(status_dir, ignore_errors=True)
2867 def GetImportExportStatus(names):
2868 """Returns import/export daemon status.
2870 @type names: sequence
2871 @param names: List of names
2872 @rtype: List of dicts
2873 @return: Returns a list of the state of each named import/export or None if a
2874 status couldn't be read
2880 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2884 data = utils.ReadFile(status_file)
2885 except EnvironmentError, err:
2886 if err.errno != errno.ENOENT:
2894 result.append(serializer.LoadJson(data))
2899 def AbortImportExport(name):
2900 """Sends SIGTERM to a running import/export daemon.
2903 logging.info("Abort import/export %s", name)
2905 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2906 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2909 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2911 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2914 def CleanupImportExport(name):
2915 """Cleanup after an import or export.
2917 If the import/export daemon is still running it's killed. Afterwards the
2918 whole status directory is removed.
2921 logging.info("Finalizing import/export %s", name)
2923 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2925 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2928 logging.info("Import/export %s is still running with PID %s",
2930 utils.KillProcess(pid, waitpid=False)
2932 shutil.rmtree(status_dir, ignore_errors=True)
2935 def _FindDisks(nodes_ip, disks):
2936 """Sets the physical ID on disks and returns the block devices.
2939 # set the correct physical ID
2940 my_name = netutils.HostInfo().name
2942 cf.SetPhysicalID(my_name, nodes_ip)
2947 rd = _RecursiveFindBD(cf)
2949 _Fail("Can't find device %s", cf)
2954 def DrbdDisconnectNet(nodes_ip, disks):
2955 """Disconnects the network on a list of drbd devices.
2958 bdevs = _FindDisks(nodes_ip, disks)
2964 except errors.BlockDeviceError, err:
2965 _Fail("Can't change network configuration to standalone mode: %s",
2969 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2970 """Attaches the network on a list of drbd devices.
2973 bdevs = _FindDisks(nodes_ip, disks)
2976 for idx, rd in enumerate(bdevs):
2978 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2979 except EnvironmentError, err:
2980 _Fail("Can't create symlink: %s", err)
2981 # reconnect disks, switch to new master configuration and if
2982 # needed primary mode
2985 rd.AttachNet(multimaster)
2986 except errors.BlockDeviceError, err:
2987 _Fail("Can't change network configuration: %s", err)
2989 # wait until the disks are connected; we need to retry the re-attach
2990 # if the device becomes standalone, as this might happen if the one
2991 # node disconnects and reconnects in a different mode before the
2992 # other node reconnects; in this case, one or both of the nodes will
2993 # decide it has wrong configuration and switch to standalone
2996 all_connected = True
2999 stats = rd.GetProcStatus()
3001 all_connected = (all_connected and
3002 (stats.is_connected or stats.is_in_resync))
3004 if stats.is_standalone:
3005 # peer had different config info and this node became
3006 # standalone, even though this should not happen with the
3007 # new staged way of changing disk configs
3009 rd.AttachNet(multimaster)
3010 except errors.BlockDeviceError, err:
3011 _Fail("Can't change network configuration: %s", err)
3013 if not all_connected:
3014 raise utils.RetryAgain()
3017 # Start with a delay of 100 miliseconds and go up to 5 seconds
3018 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3019 except utils.RetryTimeout:
3020 _Fail("Timeout in disk reconnecting")
3023 # change to primary mode
3027 except errors.BlockDeviceError, err:
3028 _Fail("Can't change to primary mode: %s", err)
3031 def DrbdWaitSync(nodes_ip, disks):
3032 """Wait until DRBDs have synchronized.
3036 stats = rd.GetProcStatus()
3037 if not (stats.is_connected or stats.is_in_resync):
3038 raise utils.RetryAgain()
3041 bdevs = _FindDisks(nodes_ip, disks)
3047 # poll each second for 15 seconds
3048 stats = utils.Retry(_helper, 1, 15, args=[rd])
3049 except utils.RetryTimeout:
3050 stats = rd.GetProcStatus()
3052 if not (stats.is_connected or stats.is_in_resync):
3053 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3054 alldone = alldone and (not stats.is_in_resync)
3055 if stats.sync_percent is not None:
3056 min_resync = min(min_resync, stats.sync_percent)
3058 return (alldone, min_resync)
3061 def GetDrbdUsermodeHelper():
3062 """Returns DRBD usermode helper currently configured.
3066 return bdev.BaseDRBD.GetUsermodeHelper()
3067 except errors.BlockDeviceError, err:
3071 def PowercycleNode(hypervisor_type):
3072 """Hard-powercycle the node.
3074 Because we need to return first, and schedule the powercycle in the
3075 background, we won't be able to report failures nicely.
3078 hyper = hypervisor.GetHypervisor(hypervisor_type)
3082 # if we can't fork, we'll pretend that we're in the child process
3085 return "Reboot scheduled in 5 seconds"
3086 # ensure the child is running on ram
3089 except Exception: # pylint: disable-msg=W0703
3092 hyper.PowercycleNode()
3095 class HooksRunner(object):
3098 This class is instantiated on the node side (ganeti-noded) and not
3102 def __init__(self, hooks_base_dir=None):
3103 """Constructor for hooks runner.
3105 @type hooks_base_dir: str or None
3106 @param hooks_base_dir: if not None, this overrides the
3107 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3110 if hooks_base_dir is None:
3111 hooks_base_dir = constants.HOOKS_BASE_DIR
3112 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3114 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3116 def RunHooks(self, hpath, phase, env):
3117 """Run the scripts in the hooks directory.
3120 @param hpath: the path to the hooks directory which
3123 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3124 L{constants.HOOKS_PHASE_POST}
3126 @param env: dictionary with the environment for the hook
3128 @return: list of 3-element tuples:
3130 - script result, either L{constants.HKR_SUCCESS} or
3131 L{constants.HKR_FAIL}
3132 - output of the script
3134 @raise errors.ProgrammerError: for invalid input
3138 if phase == constants.HOOKS_PHASE_PRE:
3140 elif phase == constants.HOOKS_PHASE_POST:
3143 _Fail("Unknown hooks phase '%s'", phase)
3146 subdir = "%s-%s.d" % (hpath, suffix)
3147 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3151 if not os.path.isdir(dir_name):
3152 # for non-existing/non-dirs, we simply exit instead of logging a
3153 # warning at every operation
3156 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3158 for (relname, relstatus, runresult) in runparts_results:
3159 if relstatus == constants.RUNPARTS_SKIP:
3160 rrval = constants.HKR_SKIP
3162 elif relstatus == constants.RUNPARTS_ERR:
3163 rrval = constants.HKR_FAIL
3164 output = "Hook script execution error: %s" % runresult
3165 elif relstatus == constants.RUNPARTS_RUN:
3166 if runresult.failed:
3167 rrval = constants.HKR_FAIL
3169 rrval = constants.HKR_SUCCESS
3170 output = utils.SafeEncode(runresult.output.strip())
3171 results.append(("%s/%s" % (subdir, relname), rrval, output))
3176 class IAllocatorRunner(object):
3177 """IAllocator runner.
3179 This class is instantiated on the node side (ganeti-noded) and not on
3184 def Run(name, idata):
3185 """Run an iallocator script.
3188 @param name: the iallocator script name
3190 @param idata: the allocator input data
3193 @return: two element tuple of:
3195 - either error message or stdout of allocator (for success)
3198 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3200 if alloc_script is None:
3201 _Fail("iallocator module '%s' not found in the search path", name)
3203 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3207 result = utils.RunCmd([alloc_script, fin_name])
3209 _Fail("iallocator module '%s' failed: %s, output '%s'",
3210 name, result.fail_reason, result.output)
3214 return result.stdout
3217 class DevCacheManager(object):
3218 """Simple class for managing a cache of block device information.
3221 _DEV_PREFIX = "/dev/"
3222 _ROOT_DIR = constants.BDEV_CACHE_DIR
3225 def _ConvertPath(cls, dev_path):
3226 """Converts a /dev/name path to the cache file name.
3228 This replaces slashes with underscores and strips the /dev
3229 prefix. It then returns the full path to the cache file.
3232 @param dev_path: the C{/dev/} path name
3234 @return: the converted path name
3237 if dev_path.startswith(cls._DEV_PREFIX):
3238 dev_path = dev_path[len(cls._DEV_PREFIX):]
3239 dev_path = dev_path.replace("/", "_")
3240 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3244 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3245 """Updates the cache information for a given device.
3248 @param dev_path: the pathname of the device
3250 @param owner: the owner (instance name) of the device
3251 @type on_primary: bool
3252 @param on_primary: whether this is the primary
3255 @param iv_name: the instance-visible name of the
3256 device, as in objects.Disk.iv_name
3261 if dev_path is None:
3262 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3264 fpath = cls._ConvertPath(dev_path)
3270 iv_name = "not_visible"
3271 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3273 utils.WriteFile(fpath, data=fdata)
3274 except EnvironmentError, err:
3275 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3278 def RemoveCache(cls, dev_path):
3279 """Remove data for a dev_path.
3281 This is just a wrapper over L{utils.RemoveFile} with a converted
3282 path name and logging.
3285 @param dev_path: the pathname of the device
3290 if dev_path is None:
3291 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3293 fpath = cls._ConvertPath(dev_path)
3295 utils.RemoveFile(fpath)
3296 except EnvironmentError, err:
3297 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)