4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
68 constants.JOB_QUEUE_ARCHIVE_DIR,
70 constants.CRYPTO_KEYS_DIR,
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
80 class RPCFail(Exception):
81 """Class denoting RPC failure.
83 Its argument is the error message.
88 def _Fail(msg, *args, **kwargs):
89 """Log an error and the raise an RPCFail exception.
91 This exception is then handled specially in the ganeti daemon and
92 turned into a 'failed' return type. As such, this function is a
93 useful shortcut for logging the error and returning it to the master
97 @param msg: the text of the exception
103 if "log" not in kwargs or kwargs["log"]: # if we should log this error
104 if "exc" in kwargs and kwargs["exc"]:
105 logging.exception(msg)
112 """Simple wrapper to return a SimpleStore.
114 @rtype: L{ssconf.SimpleStore}
115 @return: a SimpleStore instance
118 return ssconf.SimpleStore()
121 def _GetSshRunner(cluster_name):
122 """Simple wrapper to return an SshRunner.
124 @type cluster_name: str
125 @param cluster_name: the cluster name, which is needed
126 by the SshRunner constructor
127 @rtype: L{ssh.SshRunner}
128 @return: an SshRunner instance
131 return ssh.SshRunner(cluster_name)
134 def _Decompress(data):
135 """Unpacks data compressed by the RPC client.
137 @type data: list or tuple
138 @param data: Data sent by RPC client
140 @return: Decompressed data
143 assert isinstance(data, (list, tuple))
144 assert len(data) == 2
145 (encoding, content) = data
146 if encoding == constants.RPC_ENCODING_NONE:
148 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149 return zlib.decompress(base64.b64decode(content))
151 raise AssertionError("Unknown data encoding")
154 def _CleanDirectory(path, exclude=None):
155 """Removes all regular files in a directory.
158 @param path: the directory to clean
160 @param exclude: list of files to be excluded, defaults
164 if path not in _ALLOWED_CLEAN_DIRS:
165 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
168 if not os.path.isdir(path):
173 # Normalize excluded paths
174 exclude = [os.path.normpath(i) for i in exclude]
176 for rel_name in utils.ListVisibleFiles(path):
177 full_name = utils.PathJoin(path, rel_name)
178 if full_name in exclude:
180 if os.path.isfile(full_name) and not os.path.islink(full_name):
181 utils.RemoveFile(full_name)
184 def _BuildUploadFileList():
185 """Build the list of allowed upload files.
187 This is abstracted so that it's built only once at module import time.
190 allowed_files = set([
191 constants.CLUSTER_CONF_FILE,
193 constants.SSH_KNOWN_HOSTS_FILE,
194 constants.VNC_PASSWORD_FILE,
195 constants.RAPI_CERT_FILE,
196 constants.RAPI_USERS_FILE,
197 constants.CONFD_HMAC_KEY,
198 constants.CLUSTER_DOMAIN_SECRET_FILE,
201 for hv_name in constants.HYPER_TYPES:
202 hv_class = hypervisor.GetHypervisorClass(hv_name)
203 allowed_files.update(hv_class.GetAncillaryFiles())
205 return frozenset(allowed_files)
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212 """Removes job queue files and archived jobs.
218 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223 """Returns master information.
225 This is an utility function to compute master information, either
226 for consumption here or from the node daemon.
229 @return: master_netdev, master_ip, master_name, primary_ip_family
230 @raise RPCFail: in case of errors
235 master_netdev = cfg.GetMasterNetdev()
236 master_ip = cfg.GetMasterIP()
237 master_node = cfg.GetMasterNode()
238 primary_ip_family = cfg.GetPrimaryIPFamily()
239 except errors.ConfigurationError, err:
240 _Fail("Cluster configuration incomplete: %s", err, exc=True)
241 return (master_netdev, master_ip, master_node, primary_ip_family)
244 def StartMaster(start_daemons, no_voting):
245 """Activate local node as master node.
247 The function will either try activate the IP address of the master
248 (unless someone else has it) or also start the master daemons, based
249 on the start_daemons parameter.
251 @type start_daemons: boolean
252 @param start_daemons: whether to start the master daemons
253 (ganeti-masterd and ganeti-rapi), or (if false) activate the
255 @type no_voting: boolean
256 @param no_voting: whether to start ganeti-masterd without a node vote
257 (if start_daemons is True), but still non-interactively
261 # GetMasterInfo will raise an exception if not able to return data
262 master_netdev, master_ip, _, family = GetMasterInfo()
265 # either start the master and rapi daemons
268 masterd_args = "--no-voting --yes-do-it"
273 "EXTRA_MASTERD_ARGS": masterd_args,
276 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
278 msg = "Can't start Ganeti master: %s" % result.output
283 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284 if netutils.IPAddress.Own(master_ip):
285 # we already have the ip:
286 logging.debug("Master IP already configured, doing nothing")
288 msg = "Someone else has the master ip, not activating"
292 ipcls = netutils.IP4Address
293 if family == netutils.IP6Address.family:
294 ipcls = netutils.IP6Address
296 result = utils.RunCmd(["ip", "address", "add",
297 "%s/%d" % (master_ip, ipcls.iplen),
298 "dev", master_netdev, "label",
299 "%s:0" % master_netdev])
301 msg = "Can't activate master IP: %s" % result.output
305 # we ignore the exit code of the following cmds
306 if ipcls == netutils.IP4Address:
307 utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308 master_ip, master_ip])
309 elif ipcls == netutils.IP6Address:
311 utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312 except errors.OpExecError:
313 # TODO: Better error reporting
314 logging.warning("Can't execute ndisc6, please install if missing")
317 _Fail("; ".join(err_msgs))
320 def StopMaster(stop_daemons):
321 """Deactivate this node as master.
323 The function will always try to deactivate the IP address of the
324 master. It will also stop the master daemons depending on the
325 stop_daemons parameter.
327 @type stop_daemons: boolean
328 @param stop_daemons: whether to also stop the master daemons
329 (ganeti-masterd and ganeti-rapi)
333 # TODO: log and report back to the caller the error failures; we
334 # need to decide in which case we fail the RPC for this
336 # GetMasterInfo will raise an exception if not able to return data
337 master_netdev, master_ip, _, family = GetMasterInfo()
339 ipcls = netutils.IP4Address
340 if family == netutils.IP6Address.family:
341 ipcls = netutils.IP6Address
343 result = utils.RunCmd(["ip", "address", "del",
344 "%s/%d" % (master_ip, ipcls.iplen),
345 "dev", master_netdev])
347 logging.error("Can't remove the master IP, error: %s", result.output)
348 # but otherwise ignore the failure
351 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
353 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
355 result.cmd, result.exit_code, result.output)
358 def EtcHostsModify(mode, host, ip):
359 """Modify a host entry in /etc/hosts.
361 @param mode: The mode to operate. Either add or remove entry
362 @param host: The host to operate on
363 @param ip: The ip associated with the entry
366 if mode == constants.ETC_HOSTS_ADD:
368 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
370 utils.AddHostToEtcHosts(host, ip)
371 elif mode == constants.ETC_HOSTS_REMOVE:
373 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374 " parameter is present")
375 utils.RemoveHostFromEtcHosts(host)
377 RPCFail("Mode not supported")
380 def LeaveCluster(modify_ssh_setup):
381 """Cleans up and remove the current node.
383 This function cleans up and prepares the current node to be removed
386 If processing is successful, then it raises an
387 L{errors.QuitGanetiException} which is used as a special case to
388 shutdown the node daemon.
390 @param modify_ssh_setup: boolean
393 _CleanDirectory(constants.DATA_DIR)
394 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
399 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
401 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
403 utils.RemoveFile(priv_key)
404 utils.RemoveFile(pub_key)
405 except errors.OpExecError:
406 logging.exception("Error while processing ssh files")
409 utils.RemoveFile(constants.CONFD_HMAC_KEY)
410 utils.RemoveFile(constants.RAPI_CERT_FILE)
411 utils.RemoveFile(constants.NODED_CERT_FILE)
412 except: # pylint: disable-msg=W0702
413 logging.exception("Error while removing cluster secrets")
415 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
417 logging.error("Command %s failed with exitcode %s and error %s",
418 result.cmd, result.exit_code, result.output)
420 # Raise a custom exception (handled in ganeti-noded)
421 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
424 def GetNodeInfo(vgname, hypervisor_type):
425 """Gives back a hash with different information about the node.
427 @type vgname: C{string}
428 @param vgname: the name of the volume group to ask for disk space information
429 @type hypervisor_type: C{str}
430 @param hypervisor_type: the name of the hypervisor to ask for
433 @return: dictionary with the following keys:
434 - vg_size is the size of the configured volume group in MiB
435 - vg_free is the free size of the volume group in MiB
436 - memory_dom0 is the memory allocated for domain0 in MiB
437 - memory_free is the currently available (free) ram in MiB
438 - memory_total is the total number of ram in MiB
443 vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444 vg_free = vg_size = None
446 vg_free = int(round(vginfo[0][0], 0))
447 vg_size = int(round(vginfo[0][1], 0))
449 outputarray['vg_size'] = vg_size
450 outputarray['vg_free'] = vg_free
452 hyper = hypervisor.GetHypervisor(hypervisor_type)
453 hyp_info = hyper.GetNodeInfo()
454 if hyp_info is not None:
455 outputarray.update(hyp_info)
457 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462 def VerifyNode(what, cluster_name):
463 """Verify the status of the local node.
465 Based on the input L{what} parameter, various checks are done on the
468 If the I{filelist} key is present, this list of
469 files is checksummed and the file/checksum pairs are returned.
471 If the I{nodelist} key is present, we check that we have
472 connectivity via ssh with the target nodes (and check the hostname
475 If the I{node-net-test} key is present, we check that we have
476 connectivity to the given nodes via both primary IP and, if
477 applicable, secondary IPs.
480 @param what: a dictionary of things to check:
481 - filelist: list of files for which to compute checksums
482 - nodelist: list of nodes we should check ssh communication with
483 - node-net-test: list of nodes we should check node daemon port
485 - hypervisor: list with hypervisors to run the verify for
487 @return: a dictionary with the same keys as the input dict, and
488 values representing the result of the checks
492 my_name = netutils.Hostname.GetSysName()
493 port = netutils.GetDaemonPort(constants.NODED)
495 if constants.NV_HYPERVISOR in what:
496 result[constants.NV_HYPERVISOR] = tmp = {}
497 for hv_name in what[constants.NV_HYPERVISOR]:
499 val = hypervisor.GetHypervisor(hv_name).Verify()
500 except errors.HypervisorError, err:
501 val = "Error while checking hypervisor: %s" % str(err)
504 if constants.NV_FILELIST in what:
505 result[constants.NV_FILELIST] = utils.FingerprintFiles(
506 what[constants.NV_FILELIST])
508 if constants.NV_NODELIST in what:
509 result[constants.NV_NODELIST] = tmp = {}
510 random.shuffle(what[constants.NV_NODELIST])
511 for node in what[constants.NV_NODELIST]:
512 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
516 if constants.NV_NODENETTEST in what:
517 result[constants.NV_NODENETTEST] = tmp = {}
518 my_pip = my_sip = None
519 for name, pip, sip in what[constants.NV_NODENETTEST]:
525 tmp[my_name] = ("Can't find my own primary/secondary IP"
528 for name, pip, sip in what[constants.NV_NODENETTEST]:
530 if not netutils.TcpPing(pip, port, source=my_pip):
531 fail.append("primary")
533 if not netutils.TcpPing(sip, port, source=my_sip):
534 fail.append("secondary")
536 tmp[name] = ("failure using the %s interface(s)" %
539 if constants.NV_MASTERIP in what:
540 # FIXME: add checks on incoming data structures (here and in the
541 # rest of the function)
542 master_name, master_ip = what[constants.NV_MASTERIP]
543 if master_name == my_name:
544 source = constants.IP4_ADDRESS_LOCALHOST
547 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
550 if constants.NV_LVLIST in what:
552 val = GetVolumeList(what[constants.NV_LVLIST])
555 result[constants.NV_LVLIST] = val
557 if constants.NV_INSTANCELIST in what:
558 # GetInstanceList can fail
560 val = GetInstanceList(what[constants.NV_INSTANCELIST])
563 result[constants.NV_INSTANCELIST] = val
565 if constants.NV_VGLIST in what:
566 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
568 if constants.NV_PVLIST in what:
569 result[constants.NV_PVLIST] = \
570 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
571 filter_allocatable=False)
573 if constants.NV_VERSION in what:
574 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
575 constants.RELEASE_VERSION)
577 if constants.NV_HVINFO in what:
578 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
579 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
581 if constants.NV_DRBDLIST in what:
583 used_minors = bdev.DRBD8.GetUsedDevs().keys()
584 except errors.BlockDeviceError, err:
585 logging.warning("Can't get used minors list", exc_info=True)
586 used_minors = str(err)
587 result[constants.NV_DRBDLIST] = used_minors
589 if constants.NV_DRBDHELPER in what:
592 payload = bdev.BaseDRBD.GetUsermodeHelper()
593 except errors.BlockDeviceError, err:
594 logging.error("Can't get DRBD usermode helper: %s", str(err))
597 result[constants.NV_DRBDHELPER] = (status, payload)
599 if constants.NV_NODESETUP in what:
600 result[constants.NV_NODESETUP] = tmpr = []
601 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
602 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
603 " under /sys, missing required directories /sys/block"
604 " and /sys/class/net")
605 if (not os.path.isdir("/proc/sys") or
606 not os.path.isfile("/proc/sysrq-trigger")):
607 tmpr.append("The procfs filesystem doesn't seem to be mounted"
608 " under /proc, missing required directory /proc/sys and"
609 " the file /proc/sysrq-trigger")
611 if constants.NV_TIME in what:
612 result[constants.NV_TIME] = utils.SplitTime(time.time())
614 if constants.NV_OSLIST in what:
615 result[constants.NV_OSLIST] = DiagnoseOS()
620 def GetVolumeList(vg_name):
621 """Compute list of logical volumes and their size.
624 @param vg_name: the volume group whose LVs we should list
627 dictionary of all partions (key) with value being a tuple of
628 their size (in MiB), inactive and online status::
630 {'test1': ('20.06', True, True)}
632 in case of errors, a string is returned with the error
638 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
639 "--separator=%s" % sep,
640 "-olv_name,lv_size,lv_attr", vg_name])
642 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
644 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
645 for line in result.stdout.splitlines():
647 match = valid_line_re.match(line)
649 logging.error("Invalid line returned from lvs output: '%s'", line)
651 name, size, attr = match.groups()
652 inactive = attr[4] == '-'
653 online = attr[5] == 'o'
654 virtual = attr[0] == 'v'
656 # we don't want to report such volumes as existing, since they
657 # don't really hold data
659 lvs[name] = (size, inactive, online)
664 def ListVolumeGroups():
665 """List the volume groups and their size.
668 @return: dictionary with keys volume name and values the
672 return utils.ListVolumeGroups()
676 """List all volumes on this node.
680 A list of dictionaries, each having four keys:
681 - name: the logical volume name,
682 - size: the size of the logical volume
683 - dev: the physical device on which the LV lives
684 - vg: the volume group to which it belongs
686 In case of errors, we return an empty list and log the
689 Note that since a logical volume can live on multiple physical
690 volumes, the resulting list might include a logical volume
694 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
696 "--options=lv_name,lv_size,devices,vg_name"])
698 _Fail("Failed to list logical volumes, lvs output: %s",
702 return dev.split('(')[0]
705 return [parse_dev(x) for x in dev.split(",")]
708 line = [v.strip() for v in line]
709 return [{'name': line[0], 'size': line[1],
710 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
713 for line in result.stdout.splitlines():
714 if line.count('|') >= 3:
715 all_devs.extend(map_line(line.split('|')))
717 logging.warning("Strange line in the output from lvs: '%s'", line)
721 def BridgesExist(bridges_list):
722 """Check if a list of bridges exist on the current node.
725 @return: C{True} if all of them exist, C{False} otherwise
729 for bridge in bridges_list:
730 if not utils.BridgeExists(bridge):
731 missing.append(bridge)
734 _Fail("Missing bridges %s", utils.CommaJoin(missing))
737 def GetInstanceList(hypervisor_list):
738 """Provides a list of instances.
740 @type hypervisor_list: list
741 @param hypervisor_list: the list of hypervisors to query information
744 @return: a list of all running instances on the current node
745 - instance1.example.com
746 - instance2.example.com
750 for hname in hypervisor_list:
752 names = hypervisor.GetHypervisor(hname).ListInstances()
753 results.extend(names)
754 except errors.HypervisorError, err:
755 _Fail("Error enumerating instances (hypervisor %s): %s",
756 hname, err, exc=True)
761 def GetInstanceInfo(instance, hname):
762 """Gives back the information about an instance as a dictionary.
764 @type instance: string
765 @param instance: the instance name
767 @param hname: the hypervisor type of the instance
770 @return: dictionary with the following keys:
771 - memory: memory size of instance (int)
772 - state: xen state of instance (string)
773 - time: cpu time of instance (float)
778 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
779 if iinfo is not None:
780 output['memory'] = iinfo[2]
781 output['state'] = iinfo[4]
782 output['time'] = iinfo[5]
787 def GetInstanceMigratable(instance):
788 """Gives whether an instance can be migrated.
790 @type instance: L{objects.Instance}
791 @param instance: object representing the instance to be checked.
794 @return: tuple of (result, description) where:
795 - result: whether the instance can be migrated or not
796 - description: a description of the issue, if relevant
799 hyper = hypervisor.GetHypervisor(instance.hypervisor)
800 iname = instance.name
801 if iname not in hyper.ListInstances():
802 _Fail("Instance %s is not running", iname)
804 for idx in range(len(instance.disks)):
805 link_name = _GetBlockDevSymlinkPath(iname, idx)
806 if not os.path.islink(link_name):
807 logging.warning("Instance %s is missing symlink %s for disk %d",
808 iname, link_name, idx)
811 def GetAllInstancesInfo(hypervisor_list):
812 """Gather data about all instances.
814 This is the equivalent of L{GetInstanceInfo}, except that it
815 computes data for all instances at once, thus being faster if one
816 needs data about more than one instance.
818 @type hypervisor_list: list
819 @param hypervisor_list: list of hypervisors to query for instance data
822 @return: dictionary of instance: data, with data having the following keys:
823 - memory: memory size of instance (int)
824 - state: xen state of instance (string)
825 - time: cpu time of instance (float)
826 - vcpus: the number of vcpus
831 for hname in hypervisor_list:
832 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
834 for name, _, memory, vcpus, state, times in iinfo:
842 # we only check static parameters, like memory and vcpus,
843 # and not state and time which can change between the
844 # invocations of the different hypervisors
845 for key in 'memory', 'vcpus':
846 if value[key] != output[name][key]:
847 _Fail("Instance %s is running twice"
848 " with different parameters", name)
854 def _InstanceLogName(kind, os_name, instance):
855 """Compute the OS log filename for a given instance and operation.
857 The instance name and os name are passed in as strings since not all
858 operations have these as part of an instance object.
861 @param kind: the operation type (e.g. add, import, etc.)
862 @type os_name: string
863 @param os_name: the os name
864 @type instance: string
865 @param instance: the name of the instance being imported/added/etc.
868 # TODO: Use tempfile.mkstemp to create unique filename
869 base = ("%s-%s-%s-%s.log" %
870 (kind, os_name, instance, utils.TimestampForFilename()))
871 return utils.PathJoin(constants.LOG_OS_DIR, base)
874 def InstanceOsAdd(instance, reinstall, debug):
875 """Add an OS to an instance.
877 @type instance: L{objects.Instance}
878 @param instance: Instance whose OS is to be installed
879 @type reinstall: boolean
880 @param reinstall: whether this is an instance reinstall
882 @param debug: debug level, passed to the OS scripts
886 inst_os = OSFromDisk(instance.os)
888 create_env = OSEnvironment(instance, inst_os, debug)
890 create_env['INSTANCE_REINSTALL'] = "1"
892 logfile = _InstanceLogName("add", instance.os, instance.name)
894 result = utils.RunCmd([inst_os.create_script], env=create_env,
895 cwd=inst_os.path, output=logfile,)
897 logging.error("os create command '%s' returned error: %s, logfile: %s,"
898 " output: %s", result.cmd, result.fail_reason, logfile,
900 lines = [utils.SafeEncode(val)
901 for val in utils.TailFile(logfile, lines=20)]
902 _Fail("OS create script failed (%s), last lines in the"
903 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
906 def RunRenameInstance(instance, old_name, debug):
907 """Run the OS rename script for an instance.
909 @type instance: L{objects.Instance}
910 @param instance: Instance whose OS is to be installed
911 @type old_name: string
912 @param old_name: previous instance name
914 @param debug: debug level, passed to the OS scripts
916 @return: the success of the operation
919 inst_os = OSFromDisk(instance.os)
921 rename_env = OSEnvironment(instance, inst_os, debug)
922 rename_env['OLD_INSTANCE_NAME'] = old_name
924 logfile = _InstanceLogName("rename", instance.os,
925 "%s-%s" % (old_name, instance.name))
927 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928 cwd=inst_os.path, output=logfile)
931 logging.error("os create command '%s' returned error: %s output: %s",
932 result.cmd, result.fail_reason, result.output)
933 lines = [utils.SafeEncode(val)
934 for val in utils.TailFile(logfile, lines=20)]
935 _Fail("OS rename script failed (%s), last lines in the"
936 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
939 def _GetBlockDevSymlinkPath(instance_name, idx):
940 return utils.PathJoin(constants.DISK_LINKS_DIR,
941 "%s:%d" % (instance_name, idx))
944 def _SymlinkBlockDev(instance_name, device_path, idx):
945 """Set up symlinks to a instance's block device.
947 This is an auxiliary function run when an instance is start (on the primary
948 node) or when an instance is migrated (on the target node).
951 @param instance_name: the name of the target instance
952 @param device_path: path of the physical block device, on the node
953 @param idx: the disk index
954 @return: absolute path to the disk's symlink
957 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
959 os.symlink(device_path, link_name)
961 if err.errno == errno.EEXIST:
962 if (not os.path.islink(link_name) or
963 os.readlink(link_name) != device_path):
965 os.symlink(device_path, link_name)
972 def _RemoveBlockDevLinks(instance_name, disks):
973 """Remove the block device symlinks belonging to the given instance.
976 for idx, _ in enumerate(disks):
977 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978 if os.path.islink(link_name):
982 logging.exception("Can't remove symlink '%s'", link_name)
985 def _GatherAndLinkBlockDevs(instance):
986 """Set up an instance's block device(s).
988 This is run on the primary node at instance startup. The block
989 devices must be already assembled.
991 @type instance: L{objects.Instance}
992 @param instance: the instance whose disks we shoul assemble
994 @return: list of (disk_object, device_path)
998 for idx, disk in enumerate(instance.disks):
999 device = _RecursiveFindBD(disk)
1001 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1005 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1007 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1010 block_devices.append((disk, link_name))
1012 return block_devices
1015 def StartInstance(instance):
1016 """Start an instance.
1018 @type instance: L{objects.Instance}
1019 @param instance: the instance object
1023 running_instances = GetInstanceList([instance.hypervisor])
1025 if instance.name in running_instances:
1026 logging.info("Instance %s already running, not starting", instance.name)
1030 block_devices = _GatherAndLinkBlockDevs(instance)
1031 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1032 hyper.StartInstance(instance, block_devices)
1033 except errors.BlockDeviceError, err:
1034 _Fail("Block device error: %s", err, exc=True)
1035 except errors.HypervisorError, err:
1036 _RemoveBlockDevLinks(instance.name, instance.disks)
1037 _Fail("Hypervisor error: %s", err, exc=True)
1040 def InstanceShutdown(instance, timeout):
1041 """Shut an instance down.
1043 @note: this functions uses polling with a hardcoded timeout.
1045 @type instance: L{objects.Instance}
1046 @param instance: the instance object
1047 @type timeout: integer
1048 @param timeout: maximum timeout for soft shutdown
1052 hv_name = instance.hypervisor
1053 hyper = hypervisor.GetHypervisor(hv_name)
1054 iname = instance.name
1056 if instance.name not in hyper.ListInstances():
1057 logging.info("Instance %s not running, doing nothing", iname)
1062 self.tried_once = False
1065 if iname not in hyper.ListInstances():
1069 hyper.StopInstance(instance, retry=self.tried_once)
1070 except errors.HypervisorError, err:
1071 if iname not in hyper.ListInstances():
1072 # if the instance is no longer existing, consider this a
1073 # success and go to cleanup
1076 _Fail("Failed to stop instance %s: %s", iname, err)
1078 self.tried_once = True
1080 raise utils.RetryAgain()
1083 utils.Retry(_TryShutdown(), 5, timeout)
1084 except utils.RetryTimeout:
1085 # the shutdown did not succeed
1086 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1089 hyper.StopInstance(instance, force=True)
1090 except errors.HypervisorError, err:
1091 if iname in hyper.ListInstances():
1092 # only raise an error if the instance still exists, otherwise
1093 # the error could simply be "instance ... unknown"!
1094 _Fail("Failed to force stop instance %s: %s", iname, err)
1098 if iname in hyper.ListInstances():
1099 _Fail("Could not shutdown instance %s even by destroy", iname)
1102 hyper.CleanupInstance(instance.name)
1103 except errors.HypervisorError, err:
1104 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1106 _RemoveBlockDevLinks(iname, instance.disks)
1109 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1110 """Reboot an instance.
1112 @type instance: L{objects.Instance}
1113 @param instance: the instance object to reboot
1114 @type reboot_type: str
1115 @param reboot_type: the type of reboot, one the following
1117 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1118 instance OS, do not recreate the VM
1119 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1120 restart the VM (at the hypervisor level)
1121 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1122 not accepted here, since that mode is handled differently, in
1123 cmdlib, and translates into full stop and start of the
1124 instance (instead of a call_instance_reboot RPC)
1125 @type shutdown_timeout: integer
1126 @param shutdown_timeout: maximum timeout for soft shutdown
1130 running_instances = GetInstanceList([instance.hypervisor])
1132 if instance.name not in running_instances:
1133 _Fail("Cannot reboot instance %s that is not running", instance.name)
1135 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1136 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1138 hyper.RebootInstance(instance)
1139 except errors.HypervisorError, err:
1140 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1141 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1143 InstanceShutdown(instance, shutdown_timeout)
1144 return StartInstance(instance)
1145 except errors.HypervisorError, err:
1146 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1148 _Fail("Invalid reboot_type received: %s", reboot_type)
1151 def MigrationInfo(instance):
1152 """Gather information about an instance to be migrated.
1154 @type instance: L{objects.Instance}
1155 @param instance: the instance definition
1158 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160 info = hyper.MigrationInfo(instance)
1161 except errors.HypervisorError, err:
1162 _Fail("Failed to fetch migration information: %s", err, exc=True)
1166 def AcceptInstance(instance, info, target):
1167 """Prepare the node to accept an instance.
1169 @type instance: L{objects.Instance}
1170 @param instance: the instance definition
1171 @type info: string/data (opaque)
1172 @param info: migration information, from the source node
1173 @type target: string
1174 @param target: target host (usually ip), on this node
1177 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179 hyper.AcceptInstance(instance, info, target)
1180 except errors.HypervisorError, err:
1181 _Fail("Failed to accept instance: %s", err, exc=True)
1184 def FinalizeMigration(instance, info, success):
1185 """Finalize any preparation to accept an instance.
1187 @type instance: L{objects.Instance}
1188 @param instance: the instance definition
1189 @type info: string/data (opaque)
1190 @param info: migration information, from the source node
1191 @type success: boolean
1192 @param success: whether the migration was a success or a failure
1195 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197 hyper.FinalizeMigration(instance, info, success)
1198 except errors.HypervisorError, err:
1199 _Fail("Failed to finalize migration: %s", err, exc=True)
1202 def MigrateInstance(instance, target, live):
1203 """Migrates an instance to another node.
1205 @type instance: L{objects.Instance}
1206 @param instance: the instance definition
1207 @type target: string
1208 @param target: the target node name
1210 @param live: whether the migration should be done live or not (the
1211 interpretation of this parameter is left to the hypervisor)
1213 @return: a tuple of (success, msg) where:
1214 - succes is a boolean denoting the success/failure of the operation
1215 - msg is a string with details in case of failure
1218 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1221 hyper.MigrateInstance(instance, target, live)
1222 except errors.HypervisorError, err:
1223 _Fail("Failed to migrate instance: %s", err, exc=True)
1226 def BlockdevCreate(disk, size, owner, on_primary, info):
1227 """Creates a block device for an instance.
1229 @type disk: L{objects.Disk}
1230 @param disk: the object describing the disk we should create
1232 @param size: the size of the physical underlying device, in MiB
1234 @param owner: the name of the instance for which disk is created,
1235 used for device cache data
1236 @type on_primary: boolean
1237 @param on_primary: indicates if it is the primary node or not
1239 @param info: string that will be sent to the physical device
1240 creation, used for example to set (LVM) tags on LVs
1242 @return: the new unique_id of the device (this can sometime be
1243 computed only after creation), or None. On secondary nodes,
1244 it's not required to return anything.
1247 # TODO: remove the obsolete 'size' argument
1248 # pylint: disable-msg=W0613
1251 for child in disk.children:
1253 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1254 except errors.BlockDeviceError, err:
1255 _Fail("Can't assemble device %s: %s", child, err)
1256 if on_primary or disk.AssembleOnSecondary():
1257 # we need the children open in case the device itself has to
1260 # pylint: disable-msg=E1103
1262 except errors.BlockDeviceError, err:
1263 _Fail("Can't make child '%s' read-write: %s", child, err)
1267 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1268 except errors.BlockDeviceError, err:
1269 _Fail("Can't create block device: %s", err)
1271 if on_primary or disk.AssembleOnSecondary():
1274 except errors.BlockDeviceError, err:
1275 _Fail("Can't assemble device after creation, unusual event: %s", err)
1276 device.SetSyncSpeed(constants.SYNC_SPEED)
1277 if on_primary or disk.OpenOnSecondary():
1279 device.Open(force=True)
1280 except errors.BlockDeviceError, err:
1281 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1282 DevCacheManager.UpdateCache(device.dev_path, owner,
1283 on_primary, disk.iv_name)
1285 device.SetInfo(info)
1287 return device.unique_id
1290 def BlockdevRemove(disk):
1291 """Remove a block device.
1293 @note: This is intended to be called recursively.
1295 @type disk: L{objects.Disk}
1296 @param disk: the disk object we should remove
1298 @return: the success of the operation
1303 rdev = _RecursiveFindBD(disk)
1304 except errors.BlockDeviceError, err:
1305 # probably can't attach
1306 logging.info("Can't attach to device %s in remove", disk)
1308 if rdev is not None:
1309 r_path = rdev.dev_path
1312 except errors.BlockDeviceError, err:
1313 msgs.append(str(err))
1315 DevCacheManager.RemoveCache(r_path)
1318 for child in disk.children:
1320 BlockdevRemove(child)
1321 except RPCFail, err:
1322 msgs.append(str(err))
1325 _Fail("; ".join(msgs))
1328 def _RecursiveAssembleBD(disk, owner, as_primary):
1329 """Activate a block device for an instance.
1331 This is run on the primary and secondary nodes for an instance.
1333 @note: this function is called recursively.
1335 @type disk: L{objects.Disk}
1336 @param disk: the disk we try to assemble
1338 @param owner: the name of the instance which owns the disk
1339 @type as_primary: boolean
1340 @param as_primary: if we should make the block device
1343 @return: the assembled device or None (in case no device
1345 @raise errors.BlockDeviceError: in case there is an error
1346 during the activation of the children or the device
1352 mcn = disk.ChildrenNeeded()
1354 mcn = 0 # max number of Nones allowed
1356 mcn = len(disk.children) - mcn # max number of Nones
1357 for chld_disk in disk.children:
1359 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1360 except errors.BlockDeviceError, err:
1361 if children.count(None) >= mcn:
1364 logging.error("Error in child activation (but continuing): %s",
1366 children.append(cdev)
1368 if as_primary or disk.AssembleOnSecondary():
1369 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1370 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1372 if as_primary or disk.OpenOnSecondary():
1374 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1375 as_primary, disk.iv_name)
1382 def BlockdevAssemble(disk, owner, as_primary):
1383 """Activate a block device for an instance.
1385 This is a wrapper over _RecursiveAssembleBD.
1387 @rtype: str or boolean
1388 @return: a C{/dev/...} path for primary nodes, and
1389 C{True} for secondary nodes
1393 result = _RecursiveAssembleBD(disk, owner, as_primary)
1394 if isinstance(result, bdev.BlockDev):
1395 # pylint: disable-msg=E1103
1396 result = result.dev_path
1397 except errors.BlockDeviceError, err:
1398 _Fail("Error while assembling disk: %s", err, exc=True)
1403 def BlockdevShutdown(disk):
1404 """Shut down a block device.
1406 First, if the device is assembled (Attach() is successful), then
1407 the device is shutdown. Then the children of the device are
1410 This function is called recursively. Note that we don't cache the
1411 children or such, as oppossed to assemble, shutdown of different
1412 devices doesn't require that the upper device was active.
1414 @type disk: L{objects.Disk}
1415 @param disk: the description of the disk we should
1421 r_dev = _RecursiveFindBD(disk)
1422 if r_dev is not None:
1423 r_path = r_dev.dev_path
1426 DevCacheManager.RemoveCache(r_path)
1427 except errors.BlockDeviceError, err:
1428 msgs.append(str(err))
1431 for child in disk.children:
1433 BlockdevShutdown(child)
1434 except RPCFail, err:
1435 msgs.append(str(err))
1438 _Fail("; ".join(msgs))
1441 def BlockdevAddchildren(parent_cdev, new_cdevs):
1442 """Extend a mirrored block device.
1444 @type parent_cdev: L{objects.Disk}
1445 @param parent_cdev: the disk to which we should add children
1446 @type new_cdevs: list of L{objects.Disk}
1447 @param new_cdevs: the list of children which we should add
1451 parent_bdev = _RecursiveFindBD(parent_cdev)
1452 if parent_bdev is None:
1453 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1454 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1455 if new_bdevs.count(None) > 0:
1456 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1457 parent_bdev.AddChildren(new_bdevs)
1460 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1461 """Shrink a mirrored block device.
1463 @type parent_cdev: L{objects.Disk}
1464 @param parent_cdev: the disk from which we should remove children
1465 @type new_cdevs: list of L{objects.Disk}
1466 @param new_cdevs: the list of children which we should remove
1470 parent_bdev = _RecursiveFindBD(parent_cdev)
1471 if parent_bdev is None:
1472 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1474 for disk in new_cdevs:
1475 rpath = disk.StaticDevPath()
1477 bd = _RecursiveFindBD(disk)
1479 _Fail("Can't find device %s while removing children", disk)
1481 devs.append(bd.dev_path)
1483 if not utils.IsNormAbsPath(rpath):
1484 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1486 parent_bdev.RemoveChildren(devs)
1489 def BlockdevGetmirrorstatus(disks):
1490 """Get the mirroring status of a list of devices.
1492 @type disks: list of L{objects.Disk}
1493 @param disks: the list of disks which we should query
1496 a list of (mirror_done, estimated_time) tuples, which
1497 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1498 @raise errors.BlockDeviceError: if any of the disks cannot be
1504 rbd = _RecursiveFindBD(dsk)
1506 _Fail("Can't find device %s", dsk)
1508 stats.append(rbd.CombinedSyncStatus())
1513 def _RecursiveFindBD(disk):
1514 """Check if a device is activated.
1516 If so, return information about the real device.
1518 @type disk: L{objects.Disk}
1519 @param disk: the disk object we need to find
1521 @return: None if the device can't be found,
1522 otherwise the device instance
1527 for chdisk in disk.children:
1528 children.append(_RecursiveFindBD(chdisk))
1530 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1533 def _OpenRealBD(disk):
1534 """Opens the underlying block device of a disk.
1536 @type disk: L{objects.Disk}
1537 @param disk: the disk object we want to open
1540 real_disk = _RecursiveFindBD(disk)
1541 if real_disk is None:
1542 _Fail("Block device '%s' is not set up", disk)
1549 def BlockdevFind(disk):
1550 """Check if a device is activated.
1552 If it is, return information about the real device.
1554 @type disk: L{objects.Disk}
1555 @param disk: the disk to find
1556 @rtype: None or objects.BlockDevStatus
1557 @return: None if the disk cannot be found, otherwise a the current
1562 rbd = _RecursiveFindBD(disk)
1563 except errors.BlockDeviceError, err:
1564 _Fail("Failed to find device: %s", err, exc=True)
1569 return rbd.GetSyncStatus()
1572 def BlockdevGetsize(disks):
1573 """Computes the size of the given disks.
1575 If a disk is not found, returns None instead.
1577 @type disks: list of L{objects.Disk}
1578 @param disks: the list of disk to compute the size for
1580 @return: list with elements None if the disk cannot be found,
1587 rbd = _RecursiveFindBD(cf)
1588 except errors.BlockDeviceError:
1594 result.append(rbd.GetActualSize())
1598 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1599 """Export a block device to a remote node.
1601 @type disk: L{objects.Disk}
1602 @param disk: the description of the disk to export
1603 @type dest_node: str
1604 @param dest_node: the destination node to export to
1605 @type dest_path: str
1606 @param dest_path: the destination path on the target node
1607 @type cluster_name: str
1608 @param cluster_name: the cluster name, needed for SSH hostalias
1612 real_disk = _OpenRealBD(disk)
1614 # the block size on the read dd is 1MiB to match our units
1615 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1616 "dd if=%s bs=1048576 count=%s",
1617 real_disk.dev_path, str(disk.size))
1619 # we set here a smaller block size as, due to ssh buffering, more
1620 # than 64-128k will mostly ignored; we use nocreat to fail if the
1621 # device is not already there or we pass a wrong path; we use
1622 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1623 # to not buffer too much memory; this means that at best, we flush
1624 # every 64k, which will not be very fast
1625 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1626 " oflag=dsync", dest_path)
1628 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1629 constants.GANETI_RUNAS,
1632 # all commands have been checked, so we're safe to combine them
1633 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1635 result = utils.RunCmd(["bash", "-c", command])
1638 _Fail("Disk copy command '%s' returned error: %s"
1639 " output: %s", command, result.fail_reason, result.output)
1642 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1643 """Write a file to the filesystem.
1645 This allows the master to overwrite(!) a file. It will only perform
1646 the operation if the file belongs to a list of configuration files.
1648 @type file_name: str
1649 @param file_name: the target file name
1651 @param data: the new contents of the file
1653 @param mode: the mode to give the file (can be None)
1655 @param uid: the owner of the file (can be -1 for default)
1657 @param gid: the group of the file (can be -1 for default)
1659 @param atime: the atime to set on the file (can be None)
1661 @param mtime: the mtime to set on the file (can be None)
1665 if not os.path.isabs(file_name):
1666 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1668 if file_name not in _ALLOWED_UPLOAD_FILES:
1669 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1672 raw_data = _Decompress(data)
1674 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1675 atime=atime, mtime=mtime)
1678 def WriteSsconfFiles(values):
1679 """Update all ssconf files.
1681 Wrapper around the SimpleStore.WriteFiles.
1684 ssconf.SimpleStore().WriteFiles(values)
1687 def _ErrnoOrStr(err):
1688 """Format an EnvironmentError exception.
1690 If the L{err} argument has an errno attribute, it will be looked up
1691 and converted into a textual C{E...} description. Otherwise the
1692 string representation of the error will be returned.
1694 @type err: L{EnvironmentError}
1695 @param err: the exception to format
1698 if hasattr(err, 'errno'):
1699 detail = errno.errorcode[err.errno]
1705 def _OSOndiskAPIVersion(os_dir):
1706 """Compute and return the API version of a given OS.
1708 This function will try to read the API version of the OS residing in
1709 the 'os_dir' directory.
1712 @param os_dir: the directory in which we should look for the OS
1714 @return: tuple (status, data) with status denoting the validity and
1715 data holding either the vaid versions or an error message
1718 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1721 st = os.stat(api_file)
1722 except EnvironmentError, err:
1723 return False, ("Required file '%s' not found under path %s: %s" %
1724 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1726 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1727 return False, ("File '%s' in %s is not a regular file" %
1728 (constants.OS_API_FILE, os_dir))
1731 api_versions = utils.ReadFile(api_file).splitlines()
1732 except EnvironmentError, err:
1733 return False, ("Error while reading the API version file at %s: %s" %
1734 (api_file, _ErrnoOrStr(err)))
1737 api_versions = [int(version.strip()) for version in api_versions]
1738 except (TypeError, ValueError), err:
1739 return False, ("API version(s) can't be converted to integer: %s" %
1742 return True, api_versions
1745 def DiagnoseOS(top_dirs=None):
1746 """Compute the validity for all OSes.
1748 @type top_dirs: list
1749 @param top_dirs: the list of directories in which to
1750 search (if not given defaults to
1751 L{constants.OS_SEARCH_PATH})
1752 @rtype: list of L{objects.OS}
1753 @return: a list of tuples (name, path, status, diagnose, variants,
1754 parameters, api_version) for all (potential) OSes under all
1755 search paths, where:
1756 - name is the (potential) OS name
1757 - path is the full path to the OS
1758 - status True/False is the validity of the OS
1759 - diagnose is the error message for an invalid OS, otherwise empty
1760 - variants is a list of supported OS variants, if any
1761 - parameters is a list of (name, help) parameters, if any
1762 - api_version is a list of support OS API versions
1765 if top_dirs is None:
1766 top_dirs = constants.OS_SEARCH_PATH
1769 for dir_name in top_dirs:
1770 if os.path.isdir(dir_name):
1772 f_names = utils.ListVisibleFiles(dir_name)
1773 except EnvironmentError, err:
1774 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1776 for name in f_names:
1777 os_path = utils.PathJoin(dir_name, name)
1778 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1781 variants = os_inst.supported_variants
1782 parameters = os_inst.supported_parameters
1783 api_versions = os_inst.api_versions
1786 variants = parameters = api_versions = []
1787 result.append((name, os_path, status, diagnose, variants,
1788 parameters, api_versions))
1793 def _TryOSFromDisk(name, base_dir=None):
1794 """Create an OS instance from disk.
1796 This function will return an OS instance if the given name is a
1799 @type base_dir: string
1800 @keyword base_dir: Base directory containing OS installations.
1801 Defaults to a search in all the OS_SEARCH_PATH dirs.
1803 @return: success and either the OS instance if we find a valid one,
1807 if base_dir is None:
1808 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1810 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1813 return False, "Directory for OS %s not found in search path" % name
1815 status, api_versions = _OSOndiskAPIVersion(os_dir)
1818 return status, api_versions
1820 if not constants.OS_API_VERSIONS.intersection(api_versions):
1821 return False, ("API version mismatch for path '%s': found %s, want %s." %
1822 (os_dir, api_versions, constants.OS_API_VERSIONS))
1824 # OS Files dictionary, we will populate it with the absolute path names
1825 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1827 if max(api_versions) >= constants.OS_API_V15:
1828 os_files[constants.OS_VARIANTS_FILE] = ''
1830 if max(api_versions) >= constants.OS_API_V20:
1831 os_files[constants.OS_PARAMETERS_FILE] = ''
1833 del os_files[constants.OS_SCRIPT_VERIFY]
1835 for filename in os_files:
1836 os_files[filename] = utils.PathJoin(os_dir, filename)
1839 st = os.stat(os_files[filename])
1840 except EnvironmentError, err:
1841 return False, ("File '%s' under path '%s' is missing (%s)" %
1842 (filename, os_dir, _ErrnoOrStr(err)))
1844 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1845 return False, ("File '%s' under path '%s' is not a regular file" %
1848 if filename in constants.OS_SCRIPTS:
1849 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1850 return False, ("File '%s' under path '%s' is not executable" %
1854 if constants.OS_VARIANTS_FILE in os_files:
1855 variants_file = os_files[constants.OS_VARIANTS_FILE]
1857 variants = utils.ReadFile(variants_file).splitlines()
1858 except EnvironmentError, err:
1859 return False, ("Error while reading the OS variants file at %s: %s" %
1860 (variants_file, _ErrnoOrStr(err)))
1862 return False, ("No supported os variant found")
1865 if constants.OS_PARAMETERS_FILE in os_files:
1866 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1868 parameters = utils.ReadFile(parameters_file).splitlines()
1869 except EnvironmentError, err:
1870 return False, ("Error while reading the OS parameters file at %s: %s" %
1871 (parameters_file, _ErrnoOrStr(err)))
1872 parameters = [v.split(None, 1) for v in parameters]
1874 os_obj = objects.OS(name=name, path=os_dir,
1875 create_script=os_files[constants.OS_SCRIPT_CREATE],
1876 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1877 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1878 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1879 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1881 supported_variants=variants,
1882 supported_parameters=parameters,
1883 api_versions=api_versions)
1887 def OSFromDisk(name, base_dir=None):
1888 """Create an OS instance from disk.
1890 This function will return an OS instance if the given name is a
1891 valid OS name. Otherwise, it will raise an appropriate
1892 L{RPCFail} exception, detailing why this is not a valid OS.
1894 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1895 an exception but returns true/false status data.
1897 @type base_dir: string
1898 @keyword base_dir: Base directory containing OS installations.
1899 Defaults to a search in all the OS_SEARCH_PATH dirs.
1900 @rtype: L{objects.OS}
1901 @return: the OS instance if we find a valid one
1902 @raise RPCFail: if we don't find a valid OS
1905 name_only = objects.OS.GetName(name)
1906 status, payload = _TryOSFromDisk(name_only, base_dir)
1914 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1915 """Calculate the basic environment for an os script.
1918 @param os_name: full operating system name (including variant)
1919 @type inst_os: L{objects.OS}
1920 @param inst_os: operating system for which the environment is being built
1921 @type os_params: dict
1922 @param os_params: the OS parameters
1923 @type debug: integer
1924 @param debug: debug level (0 or 1, for OS Api 10)
1926 @return: dict of environment variables
1927 @raise errors.BlockDeviceError: if the block device
1933 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1934 result['OS_API_VERSION'] = '%d' % api_version
1935 result['OS_NAME'] = inst_os.name
1936 result['DEBUG_LEVEL'] = '%d' % debug
1939 if api_version >= constants.OS_API_V15:
1940 variant = objects.OS.GetVariant(os_name)
1942 variant = inst_os.supported_variants[0]
1943 result['OS_VARIANT'] = variant
1946 for pname, pvalue in os_params.items():
1947 result['OSP_%s' % pname.upper()] = pvalue
1952 def OSEnvironment(instance, inst_os, debug=0):
1953 """Calculate the environment for an os script.
1955 @type instance: L{objects.Instance}
1956 @param instance: target instance for the os script run
1957 @type inst_os: L{objects.OS}
1958 @param inst_os: operating system for which the environment is being built
1959 @type debug: integer
1960 @param debug: debug level (0 or 1, for OS Api 10)
1962 @return: dict of environment variables
1963 @raise errors.BlockDeviceError: if the block device
1967 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
1969 result['INSTANCE_NAME'] = instance.name
1970 result['INSTANCE_OS'] = instance.os
1971 result['HYPERVISOR'] = instance.hypervisor
1972 result['DISK_COUNT'] = '%d' % len(instance.disks)
1973 result['NIC_COUNT'] = '%d' % len(instance.nics)
1976 for idx, disk in enumerate(instance.disks):
1977 real_disk = _OpenRealBD(disk)
1978 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1979 result['DISK_%d_ACCESS' % idx] = disk.mode
1980 if constants.HV_DISK_TYPE in instance.hvparams:
1981 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1982 instance.hvparams[constants.HV_DISK_TYPE]
1983 if disk.dev_type in constants.LDS_BLOCK:
1984 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1985 elif disk.dev_type == constants.LD_FILE:
1986 result['DISK_%d_BACKEND_TYPE' % idx] = \
1987 'file:%s' % disk.physical_id[0]
1990 for idx, nic in enumerate(instance.nics):
1991 result['NIC_%d_MAC' % idx] = nic.mac
1993 result['NIC_%d_IP' % idx] = nic.ip
1994 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1995 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1996 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1997 if nic.nicparams[constants.NIC_LINK]:
1998 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1999 if constants.HV_NIC_TYPE in instance.hvparams:
2000 result['NIC_%d_FRONTEND_TYPE' % idx] = \
2001 instance.hvparams[constants.HV_NIC_TYPE]
2004 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2005 for key, value in source.items():
2006 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2011 def BlockdevGrow(disk, amount):
2012 """Grow a stack of block devices.
2014 This function is called recursively, with the childrens being the
2015 first ones to resize.
2017 @type disk: L{objects.Disk}
2018 @param disk: the disk to be grown
2019 @rtype: (status, result)
2020 @return: a tuple with the status of the operation
2021 (True/False), and the errors message if status
2025 r_dev = _RecursiveFindBD(disk)
2027 _Fail("Cannot find block device %s", disk)
2031 except errors.BlockDeviceError, err:
2032 _Fail("Failed to grow block device: %s", err, exc=True)
2035 def BlockdevSnapshot(disk):
2036 """Create a snapshot copy of a block device.
2038 This function is called recursively, and the snapshot is actually created
2039 just for the leaf lvm backend device.
2041 @type disk: L{objects.Disk}
2042 @param disk: the disk to be snapshotted
2044 @return: snapshot disk path
2047 if disk.dev_type == constants.LD_DRBD8:
2048 if not disk.children:
2049 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2051 return BlockdevSnapshot(disk.children[0])
2052 elif disk.dev_type == constants.LD_LV:
2053 r_dev = _RecursiveFindBD(disk)
2054 if r_dev is not None:
2055 # FIXME: choose a saner value for the snapshot size
2056 # let's stay on the safe side and ask for the full size, for now
2057 return r_dev.Snapshot(disk.size)
2059 _Fail("Cannot find block device %s", disk)
2061 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2062 disk.unique_id, disk.dev_type)
2065 def FinalizeExport(instance, snap_disks):
2066 """Write out the export configuration information.
2068 @type instance: L{objects.Instance}
2069 @param instance: the instance which we export, used for
2070 saving configuration
2071 @type snap_disks: list of L{objects.Disk}
2072 @param snap_disks: list of snapshot block devices, which
2073 will be used to get the actual name of the dump file
2078 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2079 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2081 config = objects.SerializableConfigParser()
2083 config.add_section(constants.INISECT_EXP)
2084 config.set(constants.INISECT_EXP, 'version', '0')
2085 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2086 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2087 config.set(constants.INISECT_EXP, 'os', instance.os)
2088 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2090 config.add_section(constants.INISECT_INS)
2091 config.set(constants.INISECT_INS, 'name', instance.name)
2092 config.set(constants.INISECT_INS, 'memory', '%d' %
2093 instance.beparams[constants.BE_MEMORY])
2094 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2095 instance.beparams[constants.BE_VCPUS])
2096 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2097 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2100 for nic_count, nic in enumerate(instance.nics):
2102 config.set(constants.INISECT_INS, 'nic%d_mac' %
2103 nic_count, '%s' % nic.mac)
2104 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2105 for param in constants.NICS_PARAMETER_TYPES:
2106 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2107 '%s' % nic.nicparams.get(param, None))
2108 # TODO: redundant: on load can read nics until it doesn't exist
2109 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2112 for disk_count, disk in enumerate(snap_disks):
2115 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2116 ('%s' % disk.iv_name))
2117 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2118 ('%s' % disk.physical_id[1]))
2119 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2122 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2124 # New-style hypervisor/backend parameters
2126 config.add_section(constants.INISECT_HYP)
2127 for name, value in instance.hvparams.items():
2128 if name not in constants.HVC_GLOBALS:
2129 config.set(constants.INISECT_HYP, name, str(value))
2131 config.add_section(constants.INISECT_BEP)
2132 for name, value in instance.beparams.items():
2133 config.set(constants.INISECT_BEP, name, str(value))
2135 config.add_section(constants.INISECT_OSP)
2136 for name, value in instance.osparams.items():
2137 config.set(constants.INISECT_OSP, name, str(value))
2139 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2140 data=config.Dumps())
2141 shutil.rmtree(finaldestdir, ignore_errors=True)
2142 shutil.move(destdir, finaldestdir)
2145 def ExportInfo(dest):
2146 """Get export configuration information.
2149 @param dest: directory containing the export
2151 @rtype: L{objects.SerializableConfigParser}
2152 @return: a serializable config file containing the
2156 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2158 config = objects.SerializableConfigParser()
2161 if (not config.has_section(constants.INISECT_EXP) or
2162 not config.has_section(constants.INISECT_INS)):
2163 _Fail("Export info file doesn't have the required fields")
2165 return config.Dumps()
2169 """Return a list of exports currently available on this machine.
2172 @return: list of the exports
2175 if os.path.isdir(constants.EXPORT_DIR):
2176 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2178 _Fail("No exports directory")
2181 def RemoveExport(export):
2182 """Remove an existing export from the node.
2185 @param export: the name of the export to remove
2189 target = utils.PathJoin(constants.EXPORT_DIR, export)
2192 shutil.rmtree(target)
2193 except EnvironmentError, err:
2194 _Fail("Error while removing the export: %s", err, exc=True)
2197 def BlockdevRename(devlist):
2198 """Rename a list of block devices.
2200 @type devlist: list of tuples
2201 @param devlist: list of tuples of the form (disk,
2202 new_logical_id, new_physical_id); disk is an
2203 L{objects.Disk} object describing the current disk,
2204 and new logical_id/physical_id is the name we
2207 @return: True if all renames succeeded, False otherwise
2212 for disk, unique_id in devlist:
2213 dev = _RecursiveFindBD(disk)
2215 msgs.append("Can't find device %s in rename" % str(disk))
2219 old_rpath = dev.dev_path
2220 dev.Rename(unique_id)
2221 new_rpath = dev.dev_path
2222 if old_rpath != new_rpath:
2223 DevCacheManager.RemoveCache(old_rpath)
2224 # FIXME: we should add the new cache information here, like:
2225 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2226 # but we don't have the owner here - maybe parse from existing
2227 # cache? for now, we only lose lvm data when we rename, which
2228 # is less critical than DRBD or MD
2229 except errors.BlockDeviceError, err:
2230 msgs.append("Can't rename device '%s' to '%s': %s" %
2231 (dev, unique_id, err))
2232 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2235 _Fail("; ".join(msgs))
2238 def _TransformFileStorageDir(file_storage_dir):
2239 """Checks whether given file_storage_dir is valid.
2241 Checks wheter the given file_storage_dir is within the cluster-wide
2242 default file_storage_dir stored in SimpleStore. Only paths under that
2243 directory are allowed.
2245 @type file_storage_dir: str
2246 @param file_storage_dir: the path to check
2248 @return: the normalized path if valid, None otherwise
2251 if not constants.ENABLE_FILE_STORAGE:
2252 _Fail("File storage disabled at configure time")
2254 file_storage_dir = os.path.normpath(file_storage_dir)
2255 base_file_storage_dir = cfg.GetFileStorageDir()
2256 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2257 base_file_storage_dir):
2258 _Fail("File storage directory '%s' is not under base file"
2259 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2260 return file_storage_dir
2263 def CreateFileStorageDir(file_storage_dir):
2264 """Create file storage directory.
2266 @type file_storage_dir: str
2267 @param file_storage_dir: directory to create
2270 @return: tuple with first element a boolean indicating wheter dir
2271 creation was successful or not
2274 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2275 if os.path.exists(file_storage_dir):
2276 if not os.path.isdir(file_storage_dir):
2277 _Fail("Specified storage dir '%s' is not a directory",
2281 os.makedirs(file_storage_dir, 0750)
2282 except OSError, err:
2283 _Fail("Cannot create file storage directory '%s': %s",
2284 file_storage_dir, err, exc=True)
2287 def RemoveFileStorageDir(file_storage_dir):
2288 """Remove file storage directory.
2290 Remove it only if it's empty. If not log an error and return.
2292 @type file_storage_dir: str
2293 @param file_storage_dir: the directory we should cleanup
2294 @rtype: tuple (success,)
2295 @return: tuple of one element, C{success}, denoting
2296 whether the operation was successful
2299 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2300 if os.path.exists(file_storage_dir):
2301 if not os.path.isdir(file_storage_dir):
2302 _Fail("Specified Storage directory '%s' is not a directory",
2304 # deletes dir only if empty, otherwise we want to fail the rpc call
2306 os.rmdir(file_storage_dir)
2307 except OSError, err:
2308 _Fail("Cannot remove file storage directory '%s': %s",
2309 file_storage_dir, err)
2312 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2313 """Rename the file storage directory.
2315 @type old_file_storage_dir: str
2316 @param old_file_storage_dir: the current path
2317 @type new_file_storage_dir: str
2318 @param new_file_storage_dir: the name we should rename to
2319 @rtype: tuple (success,)
2320 @return: tuple of one element, C{success}, denoting
2321 whether the operation was successful
2324 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2325 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2326 if not os.path.exists(new_file_storage_dir):
2327 if os.path.isdir(old_file_storage_dir):
2329 os.rename(old_file_storage_dir, new_file_storage_dir)
2330 except OSError, err:
2331 _Fail("Cannot rename '%s' to '%s': %s",
2332 old_file_storage_dir, new_file_storage_dir, err)
2334 _Fail("Specified storage dir '%s' is not a directory",
2335 old_file_storage_dir)
2337 if os.path.exists(old_file_storage_dir):
2338 _Fail("Cannot rename '%s' to '%s': both locations exist",
2339 old_file_storage_dir, new_file_storage_dir)
2342 def _EnsureJobQueueFile(file_name):
2343 """Checks whether the given filename is in the queue directory.
2345 @type file_name: str
2346 @param file_name: the file name we should check
2348 @raises RPCFail: if the file is not valid
2351 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2352 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2355 _Fail("Passed job queue file '%s' does not belong to"
2356 " the queue directory '%s'", file_name, queue_dir)
2359 def JobQueueUpdate(file_name, content):
2360 """Updates a file in the queue directory.
2362 This is just a wrapper over L{utils.WriteFile}, with proper
2365 @type file_name: str
2366 @param file_name: the job file name
2368 @param content: the new job contents
2370 @return: the success of the operation
2373 _EnsureJobQueueFile(file_name)
2374 getents = runtime.GetEnts()
2376 # Write and replace the file atomically
2377 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2378 gid=getents.masterd_gid)
2381 def JobQueueRename(old, new):
2382 """Renames a job queue file.
2384 This is just a wrapper over os.rename with proper checking.
2387 @param old: the old (actual) file name
2389 @param new: the desired file name
2391 @return: the success of the operation and payload
2394 _EnsureJobQueueFile(old)
2395 _EnsureJobQueueFile(new)
2397 utils.RenameFile(old, new, mkdir=True)
2400 def BlockdevClose(instance_name, disks):
2401 """Closes the given block devices.
2403 This means they will be switched to secondary mode (in case of
2406 @param instance_name: if the argument is not empty, the symlinks
2407 of this instance will be removed
2408 @type disks: list of L{objects.Disk}
2409 @param disks: the list of disks to be closed
2410 @rtype: tuple (success, message)
2411 @return: a tuple of success and message, where success
2412 indicates the succes of the operation, and message
2413 which will contain the error details in case we
2419 rd = _RecursiveFindBD(cf)
2421 _Fail("Can't find device %s", cf)
2428 except errors.BlockDeviceError, err:
2429 msg.append(str(err))
2431 _Fail("Can't make devices secondary: %s", ",".join(msg))
2434 _RemoveBlockDevLinks(instance_name, disks)
2437 def ValidateHVParams(hvname, hvparams):
2438 """Validates the given hypervisor parameters.
2440 @type hvname: string
2441 @param hvname: the hypervisor name
2442 @type hvparams: dict
2443 @param hvparams: the hypervisor parameters to be validated
2448 hv_type = hypervisor.GetHypervisor(hvname)
2449 hv_type.ValidateParameters(hvparams)
2450 except errors.HypervisorError, err:
2451 _Fail(str(err), log=False)
2454 def _CheckOSPList(os_obj, parameters):
2455 """Check whether a list of parameters is supported by the OS.
2457 @type os_obj: L{objects.OS}
2458 @param os_obj: OS object to check
2459 @type parameters: list
2460 @param parameters: the list of parameters to check
2463 supported = [v[0] for v in os_obj.supported_parameters]
2464 delta = frozenset(parameters).difference(supported)
2466 _Fail("The following parameters are not supported"
2467 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2470 def ValidateOS(required, osname, checks, osparams):
2471 """Validate the given OS' parameters.
2473 @type required: boolean
2474 @param required: whether absence of the OS should translate into
2476 @type osname: string
2477 @param osname: the OS to be validated
2479 @param checks: list of the checks to run (currently only 'parameters')
2480 @type osparams: dict
2481 @param osparams: dictionary with OS parameters
2483 @return: True if the validation passed, or False if the OS was not
2484 found and L{required} was false
2487 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2488 _Fail("Unknown checks required for OS %s: %s", osname,
2489 set(checks).difference(constants.OS_VALIDATE_CALLS))
2491 name_only = objects.OS.GetName(osname)
2492 status, tbv = _TryOSFromDisk(name_only, None)
2500 if max(tbv.api_versions) < constants.OS_API_V20:
2503 if constants.OS_VALIDATE_PARAMETERS in checks:
2504 _CheckOSPList(tbv, osparams.keys())
2506 validate_env = OSCoreEnv(osname, tbv, osparams)
2507 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2510 logging.error("os validate command '%s' returned error: %s output: %s",
2511 result.cmd, result.fail_reason, result.output)
2512 _Fail("OS validation script failed (%s), output: %s",
2513 result.fail_reason, result.output, log=False)
2519 """Demotes the current node from master candidate role.
2522 # try to ensure we're not the master by mistake
2523 master, myself = ssconf.GetMasterAndMyself()
2524 if master == myself:
2525 _Fail("ssconf status shows I'm the master node, will not demote")
2527 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2528 if not result.failed:
2529 _Fail("The master daemon is running, will not demote")
2532 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2533 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2534 except EnvironmentError, err:
2535 if err.errno != errno.ENOENT:
2536 _Fail("Error while backing up cluster file: %s", err, exc=True)
2538 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2541 def _GetX509Filenames(cryptodir, name):
2542 """Returns the full paths for the private key and certificate.
2545 return (utils.PathJoin(cryptodir, name),
2546 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2547 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2550 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2551 """Creates a new X509 certificate for SSL/TLS.
2554 @param validity: Validity in seconds
2555 @rtype: tuple; (string, string)
2556 @return: Certificate name and public part
2559 (key_pem, cert_pem) = \
2560 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2561 min(validity, _MAX_SSL_CERT_VALIDITY))
2563 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2564 prefix="x509-%s-" % utils.TimestampForFilename())
2566 name = os.path.basename(cert_dir)
2567 assert len(name) > 5
2569 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2571 utils.WriteFile(key_file, mode=0400, data=key_pem)
2572 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2574 # Never return private key as it shouldn't leave the node
2575 return (name, cert_pem)
2577 shutil.rmtree(cert_dir, ignore_errors=True)
2581 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2582 """Removes a X509 certificate.
2585 @param name: Certificate name
2588 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2590 utils.RemoveFile(key_file)
2591 utils.RemoveFile(cert_file)
2595 except EnvironmentError, err:
2596 _Fail("Cannot remove certificate directory '%s': %s",
2600 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2601 """Returns the command for the requested input/output.
2603 @type instance: L{objects.Instance}
2604 @param instance: The instance object
2605 @param mode: Import/export mode
2606 @param ieio: Input/output type
2607 @param ieargs: Input/output arguments
2610 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2617 if ieio == constants.IEIO_FILE:
2618 (filename, ) = ieargs
2620 if not utils.IsNormAbsPath(filename):
2621 _Fail("Path '%s' is not normalized or absolute", filename)
2623 directory = os.path.normpath(os.path.dirname(filename))
2625 if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2626 constants.EXPORT_DIR):
2627 _Fail("File '%s' is not under exports directory '%s'",
2628 filename, constants.EXPORT_DIR)
2631 utils.Makedirs(directory, mode=0750)
2633 quoted_filename = utils.ShellQuote(filename)
2635 if mode == constants.IEM_IMPORT:
2636 suffix = "> %s" % quoted_filename
2637 elif mode == constants.IEM_EXPORT:
2638 suffix = "< %s" % quoted_filename
2640 # Retrieve file size
2642 st = os.stat(filename)
2643 except EnvironmentError, err:
2644 logging.error("Can't stat(2) %s: %s", filename, err)
2646 exp_size = utils.BytesToMebibyte(st.st_size)
2648 elif ieio == constants.IEIO_RAW_DISK:
2651 real_disk = _OpenRealBD(disk)
2653 if mode == constants.IEM_IMPORT:
2654 # we set here a smaller block size as, due to transport buffering, more
2655 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2656 # is not already there or we pass a wrong path; we use notrunc to no
2657 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2658 # much memory; this means that at best, we flush every 64k, which will
2660 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2661 " bs=%s oflag=dsync"),
2665 elif mode == constants.IEM_EXPORT:
2666 # the block size on the read dd is 1MiB to match our units
2667 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2669 str(1024 * 1024), # 1 MB
2671 exp_size = disk.size
2673 elif ieio == constants.IEIO_SCRIPT:
2674 (disk, disk_index, ) = ieargs
2676 assert isinstance(disk_index, (int, long))
2678 real_disk = _OpenRealBD(disk)
2680 inst_os = OSFromDisk(instance.os)
2681 env = OSEnvironment(instance, inst_os)
2683 if mode == constants.IEM_IMPORT:
2684 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2685 env["IMPORT_INDEX"] = str(disk_index)
2686 script = inst_os.import_script
2688 elif mode == constants.IEM_EXPORT:
2689 env["EXPORT_DEVICE"] = real_disk.dev_path
2690 env["EXPORT_INDEX"] = str(disk_index)
2691 script = inst_os.export_script
2693 # TODO: Pass special environment only to script
2694 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2696 if mode == constants.IEM_IMPORT:
2697 suffix = "| %s" % script_cmd
2699 elif mode == constants.IEM_EXPORT:
2700 prefix = "%s |" % script_cmd
2702 # Let script predict size
2703 exp_size = constants.IE_CUSTOM_SIZE
2706 _Fail("Invalid %s I/O mode %r", mode, ieio)
2708 return (env, prefix, suffix, exp_size)
2711 def _CreateImportExportStatusDir(prefix):
2712 """Creates status directory for import/export.
2715 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2717 (prefix, utils.TimestampForFilename())))
2720 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2721 """Starts an import or export daemon.
2723 @param mode: Import/output mode
2724 @type opts: L{objects.ImportExportOptions}
2725 @param opts: Daemon options
2727 @param host: Remote host for export (None for import)
2729 @param port: Remote port for export (None for import)
2730 @type instance: L{objects.Instance}
2731 @param instance: Instance object
2732 @param ieio: Input/output type
2733 @param ieioargs: Input/output arguments
2736 if mode == constants.IEM_IMPORT:
2739 if not (host is None and port is None):
2740 _Fail("Can not specify host or port on import")
2742 elif mode == constants.IEM_EXPORT:
2745 if host is None or port is None:
2746 _Fail("Host and port must be specified for an export")
2749 _Fail("Invalid mode %r", mode)
2751 if (opts.key_name is None) ^ (opts.ca_pem is None):
2752 _Fail("Cluster certificate can only be used for both key and CA")
2754 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2755 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2757 if opts.key_name is None:
2759 key_path = constants.NODED_CERT_FILE
2760 cert_path = constants.NODED_CERT_FILE
2761 assert opts.ca_pem is None
2763 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2765 assert opts.ca_pem is not None
2767 for i in [key_path, cert_path]:
2768 if not os.path.exists(i):
2769 _Fail("File '%s' does not exist" % i)
2771 status_dir = _CreateImportExportStatusDir(prefix)
2773 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2774 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2775 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2777 if opts.ca_pem is None:
2779 ca = utils.ReadFile(constants.NODED_CERT_FILE)
2784 utils.WriteFile(ca_file, data=ca, mode=0400)
2787 constants.IMPORT_EXPORT_DAEMON,
2789 "--key=%s" % key_path,
2790 "--cert=%s" % cert_path,
2791 "--ca=%s" % ca_file,
2795 cmd.append("--host=%s" % host)
2798 cmd.append("--port=%s" % port)
2801 cmd.append("--compress=%s" % opts.compress)
2804 cmd.append("--magic=%s" % opts.magic)
2806 if exp_size is not None:
2807 cmd.append("--expected-size=%s" % exp_size)
2810 cmd.append("--cmd-prefix=%s" % cmd_prefix)
2813 cmd.append("--cmd-suffix=%s" % cmd_suffix)
2815 logfile = _InstanceLogName(prefix, instance.os, instance.name)
2817 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2818 # support for receiving a file descriptor for output
2819 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2822 # The import/export name is simply the status directory name
2823 return os.path.basename(status_dir)
2826 shutil.rmtree(status_dir, ignore_errors=True)
2830 def GetImportExportStatus(names):
2831 """Returns import/export daemon status.
2833 @type names: sequence
2834 @param names: List of names
2835 @rtype: List of dicts
2836 @return: Returns a list of the state of each named import/export or None if a
2837 status couldn't be read
2843 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2847 data = utils.ReadFile(status_file)
2848 except EnvironmentError, err:
2849 if err.errno != errno.ENOENT:
2857 result.append(serializer.LoadJson(data))
2862 def AbortImportExport(name):
2863 """Sends SIGTERM to a running import/export daemon.
2866 logging.info("Abort import/export %s", name)
2868 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2869 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2872 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2874 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2877 def CleanupImportExport(name):
2878 """Cleanup after an import or export.
2880 If the import/export daemon is still running it's killed. Afterwards the
2881 whole status directory is removed.
2884 logging.info("Finalizing import/export %s", name)
2886 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2888 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2891 logging.info("Import/export %s is still running with PID %s",
2893 utils.KillProcess(pid, waitpid=False)
2895 shutil.rmtree(status_dir, ignore_errors=True)
2898 def _FindDisks(nodes_ip, disks):
2899 """Sets the physical ID on disks and returns the block devices.
2902 # set the correct physical ID
2903 my_name = netutils.Hostname.GetSysName()
2905 cf.SetPhysicalID(my_name, nodes_ip)
2910 rd = _RecursiveFindBD(cf)
2912 _Fail("Can't find device %s", cf)
2917 def DrbdDisconnectNet(nodes_ip, disks):
2918 """Disconnects the network on a list of drbd devices.
2921 bdevs = _FindDisks(nodes_ip, disks)
2927 except errors.BlockDeviceError, err:
2928 _Fail("Can't change network configuration to standalone mode: %s",
2932 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2933 """Attaches the network on a list of drbd devices.
2936 bdevs = _FindDisks(nodes_ip, disks)
2939 for idx, rd in enumerate(bdevs):
2941 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2942 except EnvironmentError, err:
2943 _Fail("Can't create symlink: %s", err)
2944 # reconnect disks, switch to new master configuration and if
2945 # needed primary mode
2948 rd.AttachNet(multimaster)
2949 except errors.BlockDeviceError, err:
2950 _Fail("Can't change network configuration: %s", err)
2952 # wait until the disks are connected; we need to retry the re-attach
2953 # if the device becomes standalone, as this might happen if the one
2954 # node disconnects and reconnects in a different mode before the
2955 # other node reconnects; in this case, one or both of the nodes will
2956 # decide it has wrong configuration and switch to standalone
2959 all_connected = True
2962 stats = rd.GetProcStatus()
2964 all_connected = (all_connected and
2965 (stats.is_connected or stats.is_in_resync))
2967 if stats.is_standalone:
2968 # peer had different config info and this node became
2969 # standalone, even though this should not happen with the
2970 # new staged way of changing disk configs
2972 rd.AttachNet(multimaster)
2973 except errors.BlockDeviceError, err:
2974 _Fail("Can't change network configuration: %s", err)
2976 if not all_connected:
2977 raise utils.RetryAgain()
2980 # Start with a delay of 100 miliseconds and go up to 5 seconds
2981 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2982 except utils.RetryTimeout:
2983 _Fail("Timeout in disk reconnecting")
2986 # change to primary mode
2990 except errors.BlockDeviceError, err:
2991 _Fail("Can't change to primary mode: %s", err)
2994 def DrbdWaitSync(nodes_ip, disks):
2995 """Wait until DRBDs have synchronized.
2999 stats = rd.GetProcStatus()
3000 if not (stats.is_connected or stats.is_in_resync):
3001 raise utils.RetryAgain()
3004 bdevs = _FindDisks(nodes_ip, disks)
3010 # poll each second for 15 seconds
3011 stats = utils.Retry(_helper, 1, 15, args=[rd])
3012 except utils.RetryTimeout:
3013 stats = rd.GetProcStatus()
3015 if not (stats.is_connected or stats.is_in_resync):
3016 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3017 alldone = alldone and (not stats.is_in_resync)
3018 if stats.sync_percent is not None:
3019 min_resync = min(min_resync, stats.sync_percent)
3021 return (alldone, min_resync)
3024 def GetDrbdUsermodeHelper():
3025 """Returns DRBD usermode helper currently configured.
3029 return bdev.BaseDRBD.GetUsermodeHelper()
3030 except errors.BlockDeviceError, err:
3034 def PowercycleNode(hypervisor_type):
3035 """Hard-powercycle the node.
3037 Because we need to return first, and schedule the powercycle in the
3038 background, we won't be able to report failures nicely.
3041 hyper = hypervisor.GetHypervisor(hypervisor_type)
3045 # if we can't fork, we'll pretend that we're in the child process
3048 return "Reboot scheduled in 5 seconds"
3049 # ensure the child is running on ram
3052 except Exception: # pylint: disable-msg=W0703
3055 hyper.PowercycleNode()
3058 class HooksRunner(object):
3061 This class is instantiated on the node side (ganeti-noded) and not
3065 def __init__(self, hooks_base_dir=None):
3066 """Constructor for hooks runner.
3068 @type hooks_base_dir: str or None
3069 @param hooks_base_dir: if not None, this overrides the
3070 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3073 if hooks_base_dir is None:
3074 hooks_base_dir = constants.HOOKS_BASE_DIR
3075 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3077 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3079 def RunHooks(self, hpath, phase, env):
3080 """Run the scripts in the hooks directory.
3083 @param hpath: the path to the hooks directory which
3086 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3087 L{constants.HOOKS_PHASE_POST}
3089 @param env: dictionary with the environment for the hook
3091 @return: list of 3-element tuples:
3093 - script result, either L{constants.HKR_SUCCESS} or
3094 L{constants.HKR_FAIL}
3095 - output of the script
3097 @raise errors.ProgrammerError: for invalid input
3101 if phase == constants.HOOKS_PHASE_PRE:
3103 elif phase == constants.HOOKS_PHASE_POST:
3106 _Fail("Unknown hooks phase '%s'", phase)
3109 subdir = "%s-%s.d" % (hpath, suffix)
3110 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3114 if not os.path.isdir(dir_name):
3115 # for non-existing/non-dirs, we simply exit instead of logging a
3116 # warning at every operation
3119 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3121 for (relname, relstatus, runresult) in runparts_results:
3122 if relstatus == constants.RUNPARTS_SKIP:
3123 rrval = constants.HKR_SKIP
3125 elif relstatus == constants.RUNPARTS_ERR:
3126 rrval = constants.HKR_FAIL
3127 output = "Hook script execution error: %s" % runresult
3128 elif relstatus == constants.RUNPARTS_RUN:
3129 if runresult.failed:
3130 rrval = constants.HKR_FAIL
3132 rrval = constants.HKR_SUCCESS
3133 output = utils.SafeEncode(runresult.output.strip())
3134 results.append(("%s/%s" % (subdir, relname), rrval, output))
3139 class IAllocatorRunner(object):
3140 """IAllocator runner.
3142 This class is instantiated on the node side (ganeti-noded) and not on
3147 def Run(name, idata):
3148 """Run an iallocator script.
3151 @param name: the iallocator script name
3153 @param idata: the allocator input data
3156 @return: two element tuple of:
3158 - either error message or stdout of allocator (for success)
3161 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3163 if alloc_script is None:
3164 _Fail("iallocator module '%s' not found in the search path", name)
3166 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3170 result = utils.RunCmd([alloc_script, fin_name])
3172 _Fail("iallocator module '%s' failed: %s, output '%s'",
3173 name, result.fail_reason, result.output)
3177 return result.stdout
3180 class DevCacheManager(object):
3181 """Simple class for managing a cache of block device information.
3184 _DEV_PREFIX = "/dev/"
3185 _ROOT_DIR = constants.BDEV_CACHE_DIR
3188 def _ConvertPath(cls, dev_path):
3189 """Converts a /dev/name path to the cache file name.
3191 This replaces slashes with underscores and strips the /dev
3192 prefix. It then returns the full path to the cache file.
3195 @param dev_path: the C{/dev/} path name
3197 @return: the converted path name
3200 if dev_path.startswith(cls._DEV_PREFIX):
3201 dev_path = dev_path[len(cls._DEV_PREFIX):]
3202 dev_path = dev_path.replace("/", "_")
3203 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3207 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3208 """Updates the cache information for a given device.
3211 @param dev_path: the pathname of the device
3213 @param owner: the owner (instance name) of the device
3214 @type on_primary: bool
3215 @param on_primary: whether this is the primary
3218 @param iv_name: the instance-visible name of the
3219 device, as in objects.Disk.iv_name
3224 if dev_path is None:
3225 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3227 fpath = cls._ConvertPath(dev_path)
3233 iv_name = "not_visible"
3234 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3236 utils.WriteFile(fpath, data=fdata)
3237 except EnvironmentError, err:
3238 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3241 def RemoveCache(cls, dev_path):
3242 """Remove data for a dev_path.
3244 This is just a wrapper over L{utils.RemoveFile} with a converted
3245 path name and logging.
3248 @param dev_path: the pathname of the device
3253 if dev_path is None:
3254 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3256 fpath = cls._ConvertPath(dev_path)
3258 utils.RemoveFile(fpath)
3259 except EnvironmentError, err:
3260 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)