4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
68 constants.JOB_QUEUE_ARCHIVE_DIR,
70 constants.CRYPTO_KEYS_DIR,
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
83 class RPCFail(Exception):
84 """Class denoting RPC failure.
86 Its argument is the error message.
91 def _Fail(msg, *args, **kwargs):
92 """Log an error and the raise an RPCFail exception.
94 This exception is then handled specially in the ganeti daemon and
95 turned into a 'failed' return type. As such, this function is a
96 useful shortcut for logging the error and returning it to the master
100 @param msg: the text of the exception
106 if "log" not in kwargs or kwargs["log"]: # if we should log this error
107 if "exc" in kwargs and kwargs["exc"]:
108 logging.exception(msg)
115 """Simple wrapper to return a SimpleStore.
117 @rtype: L{ssconf.SimpleStore}
118 @return: a SimpleStore instance
121 return ssconf.SimpleStore()
124 def _GetSshRunner(cluster_name):
125 """Simple wrapper to return an SshRunner.
127 @type cluster_name: str
128 @param cluster_name: the cluster name, which is needed
129 by the SshRunner constructor
130 @rtype: L{ssh.SshRunner}
131 @return: an SshRunner instance
134 return ssh.SshRunner(cluster_name)
137 def _Decompress(data):
138 """Unpacks data compressed by the RPC client.
140 @type data: list or tuple
141 @param data: Data sent by RPC client
143 @return: Decompressed data
146 assert isinstance(data, (list, tuple))
147 assert len(data) == 2
148 (encoding, content) = data
149 if encoding == constants.RPC_ENCODING_NONE:
151 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152 return zlib.decompress(base64.b64decode(content))
154 raise AssertionError("Unknown data encoding")
157 def _CleanDirectory(path, exclude=None):
158 """Removes all regular files in a directory.
161 @param path: the directory to clean
163 @param exclude: list of files to be excluded, defaults
167 if path not in _ALLOWED_CLEAN_DIRS:
168 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
171 if not os.path.isdir(path):
176 # Normalize excluded paths
177 exclude = [os.path.normpath(i) for i in exclude]
179 for rel_name in utils.ListVisibleFiles(path):
180 full_name = utils.PathJoin(path, rel_name)
181 if full_name in exclude:
183 if os.path.isfile(full_name) and not os.path.islink(full_name):
184 utils.RemoveFile(full_name)
187 def _BuildUploadFileList():
188 """Build the list of allowed upload files.
190 This is abstracted so that it's built only once at module import time.
193 allowed_files = set([
194 constants.CLUSTER_CONF_FILE,
196 constants.SSH_KNOWN_HOSTS_FILE,
197 constants.VNC_PASSWORD_FILE,
198 constants.RAPI_CERT_FILE,
199 constants.SPICE_CERT_FILE,
200 constants.SPICE_CACERT_FILE,
201 constants.RAPI_USERS_FILE,
202 constants.CONFD_HMAC_KEY,
203 constants.CLUSTER_DOMAIN_SECRET_FILE,
206 for hv_name in constants.HYPER_TYPES:
207 hv_class = hypervisor.GetHypervisorClass(hv_name)
208 allowed_files.update(hv_class.GetAncillaryFiles())
210 return frozenset(allowed_files)
213 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
217 """Removes job queue files and archived jobs.
223 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
228 """Returns master information.
230 This is an utility function to compute master information, either
231 for consumption here or from the node daemon.
234 @return: master_netdev, master_ip, master_name, primary_ip_family
235 @raise RPCFail: in case of errors
240 master_netdev = cfg.GetMasterNetdev()
241 master_ip = cfg.GetMasterIP()
242 master_node = cfg.GetMasterNode()
243 primary_ip_family = cfg.GetPrimaryIPFamily()
244 except errors.ConfigurationError, err:
245 _Fail("Cluster configuration incomplete: %s", err, exc=True)
246 return (master_netdev, master_ip, master_node, primary_ip_family)
249 def ActivateMasterIp():
250 """Activate the IP address of the master daemon.
253 # GetMasterInfo will raise an exception if not able to return data
254 master_netdev, master_ip, _, family = GetMasterInfo()
257 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
258 if netutils.IPAddress.Own(master_ip):
259 # we already have the ip:
260 logging.debug("Master IP already configured, doing nothing")
262 err_msg = "Someone else has the master ip, not activating"
263 logging.error(err_msg)
265 ipcls = netutils.IP4Address
266 if family == netutils.IP6Address.family:
267 ipcls = netutils.IP6Address
269 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
270 "%s/%d" % (master_ip, ipcls.iplen),
271 "dev", master_netdev, "label",
272 "%s:0" % master_netdev])
274 err_msg = "Can't activate master IP: %s" % result.output
275 logging.error(err_msg)
277 # we ignore the exit code of the following cmds
278 if ipcls == netutils.IP4Address:
279 utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
280 master_ip, master_ip])
281 elif ipcls == netutils.IP6Address:
283 utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
284 except errors.OpExecError:
285 # TODO: Better error reporting
286 logging.warning("Can't execute ndisc6, please install if missing")
292 def StartMasterDaemons(no_voting):
293 """Activate local node as master node.
295 The function will start the master daemons (ganeti-masterd and ganeti-rapi).
297 @type no_voting: boolean
298 @param no_voting: whether to start ganeti-masterd without a node vote
299 but still non-interactively
305 masterd_args = "--no-voting --yes-do-it"
310 "EXTRA_MASTERD_ARGS": masterd_args,
313 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
315 msg = "Can't start Ganeti master: %s" % result.output
320 def DeactivateMasterIp():
321 """Deactivate the master IP on this node.
324 # TODO: log and report back to the caller the error failures; we
325 # need to decide in which case we fail the RPC for this
327 # GetMasterInfo will raise an exception if not able to return data
328 master_netdev, master_ip, _, family = GetMasterInfo()
330 ipcls = netutils.IP4Address
331 if family == netutils.IP6Address.family:
332 ipcls = netutils.IP6Address
334 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
335 "%s/%d" % (master_ip, ipcls.iplen),
336 "dev", master_netdev])
338 logging.error("Can't remove the master IP, error: %s", result.output)
339 # but otherwise ignore the failure
342 def StopMasterDaemons():
343 """Stop the master daemons on this node.
345 Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
350 # TODO: log and report back to the caller the error failures; we
351 # need to decide in which case we fail the RPC for this
353 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357 result.cmd, result.exit_code, result.output)
360 def EtcHostsModify(mode, host, ip):
361 """Modify a host entry in /etc/hosts.
363 @param mode: The mode to operate. Either add or remove entry
364 @param host: The host to operate on
365 @param ip: The ip associated with the entry
368 if mode == constants.ETC_HOSTS_ADD:
370 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
372 utils.AddHostToEtcHosts(host, ip)
373 elif mode == constants.ETC_HOSTS_REMOVE:
375 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
376 " parameter is present")
377 utils.RemoveHostFromEtcHosts(host)
379 RPCFail("Mode not supported")
382 def LeaveCluster(modify_ssh_setup):
383 """Cleans up and remove the current node.
385 This function cleans up and prepares the current node to be removed
388 If processing is successful, then it raises an
389 L{errors.QuitGanetiException} which is used as a special case to
390 shutdown the node daemon.
392 @param modify_ssh_setup: boolean
395 _CleanDirectory(constants.DATA_DIR)
396 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
401 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405 utils.RemoveFile(priv_key)
406 utils.RemoveFile(pub_key)
407 except errors.OpExecError:
408 logging.exception("Error while processing ssh files")
411 utils.RemoveFile(constants.CONFD_HMAC_KEY)
412 utils.RemoveFile(constants.RAPI_CERT_FILE)
413 utils.RemoveFile(constants.SPICE_CERT_FILE)
414 utils.RemoveFile(constants.SPICE_CACERT_FILE)
415 utils.RemoveFile(constants.NODED_CERT_FILE)
416 except: # pylint: disable=W0702
417 logging.exception("Error while removing cluster secrets")
419 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
421 logging.error("Command %s failed with exitcode %s and error %s",
422 result.cmd, result.exit_code, result.output)
424 # Raise a custom exception (handled in ganeti-noded)
425 raise errors.QuitGanetiException(True, "Shutdown scheduled")
428 def GetNodeInfo(vgname, hypervisor_type):
429 """Gives back a hash with different information about the node.
431 @type vgname: C{string}
432 @param vgname: the name of the volume group to ask for disk space information
433 @type hypervisor_type: C{str}
434 @param hypervisor_type: the name of the hypervisor to ask for
437 @return: dictionary with the following keys:
438 - vg_size is the size of the configured volume group in MiB
439 - vg_free is the free size of the volume group in MiB
440 - memory_dom0 is the memory allocated for domain0 in MiB
441 - memory_free is the currently available (free) ram in MiB
442 - memory_total is the total number of ram in MiB
443 - hv_version: the hypervisor version, if available
448 if vgname is not None:
449 vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
450 vg_free = vg_size = None
452 vg_free = int(round(vginfo[0][0], 0))
453 vg_size = int(round(vginfo[0][1], 0))
454 outputarray["vg_size"] = vg_size
455 outputarray["vg_free"] = vg_free
457 if hypervisor_type is not None:
458 hyper = hypervisor.GetHypervisor(hypervisor_type)
459 hyp_info = hyper.GetNodeInfo()
460 if hyp_info is not None:
461 outputarray.update(hyp_info)
463 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
468 def VerifyNode(what, cluster_name):
469 """Verify the status of the local node.
471 Based on the input L{what} parameter, various checks are done on the
474 If the I{filelist} key is present, this list of
475 files is checksummed and the file/checksum pairs are returned.
477 If the I{nodelist} key is present, we check that we have
478 connectivity via ssh with the target nodes (and check the hostname
481 If the I{node-net-test} key is present, we check that we have
482 connectivity to the given nodes via both primary IP and, if
483 applicable, secondary IPs.
486 @param what: a dictionary of things to check:
487 - filelist: list of files for which to compute checksums
488 - nodelist: list of nodes we should check ssh communication with
489 - node-net-test: list of nodes we should check node daemon port
491 - hypervisor: list with hypervisors to run the verify for
493 @return: a dictionary with the same keys as the input dict, and
494 values representing the result of the checks
498 my_name = netutils.Hostname.GetSysName()
499 port = netutils.GetDaemonPort(constants.NODED)
500 vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
502 if constants.NV_HYPERVISOR in what and vm_capable:
503 result[constants.NV_HYPERVISOR] = tmp = {}
504 for hv_name in what[constants.NV_HYPERVISOR]:
506 val = hypervisor.GetHypervisor(hv_name).Verify()
507 except errors.HypervisorError, err:
508 val = "Error while checking hypervisor: %s" % str(err)
511 if constants.NV_HVPARAMS in what and vm_capable:
512 result[constants.NV_HVPARAMS] = tmp = []
513 for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
515 logging.info("Validating hv %s, %s", hv_name, hvparms)
516 hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
517 except errors.HypervisorError, err:
518 tmp.append((source, hv_name, str(err)))
520 if constants.NV_FILELIST in what:
521 result[constants.NV_FILELIST] = utils.FingerprintFiles(
522 what[constants.NV_FILELIST])
524 if constants.NV_NODELIST in what:
525 (nodes, bynode) = what[constants.NV_NODELIST]
527 # Add nodes from other groups (different for each node)
529 nodes.extend(bynode[my_name])
534 random.shuffle(nodes)
536 # Try to contact all nodes
539 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
543 result[constants.NV_NODELIST] = val
545 if constants.NV_NODENETTEST in what:
546 result[constants.NV_NODENETTEST] = tmp = {}
547 my_pip = my_sip = None
548 for name, pip, sip in what[constants.NV_NODENETTEST]:
554 tmp[my_name] = ("Can't find my own primary/secondary IP"
557 for name, pip, sip in what[constants.NV_NODENETTEST]:
559 if not netutils.TcpPing(pip, port, source=my_pip):
560 fail.append("primary")
562 if not netutils.TcpPing(sip, port, source=my_sip):
563 fail.append("secondary")
565 tmp[name] = ("failure using the %s interface(s)" %
568 if constants.NV_MASTERIP in what:
569 # FIXME: add checks on incoming data structures (here and in the
570 # rest of the function)
571 master_name, master_ip = what[constants.NV_MASTERIP]
572 if master_name == my_name:
573 source = constants.IP4_ADDRESS_LOCALHOST
576 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
579 if constants.NV_OOB_PATHS in what:
580 result[constants.NV_OOB_PATHS] = tmp = []
581 for path in what[constants.NV_OOB_PATHS]:
585 tmp.append("error stating out of band helper: %s" % err)
587 if stat.S_ISREG(st.st_mode):
588 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
591 tmp.append("out of band helper %s is not executable" % path)
593 tmp.append("out of band helper %s is not a file" % path)
595 if constants.NV_LVLIST in what and vm_capable:
597 val = GetVolumeList(utils.ListVolumeGroups().keys())
600 result[constants.NV_LVLIST] = val
602 if constants.NV_INSTANCELIST in what and vm_capable:
603 # GetInstanceList can fail
605 val = GetInstanceList(what[constants.NV_INSTANCELIST])
608 result[constants.NV_INSTANCELIST] = val
610 if constants.NV_VGLIST in what and vm_capable:
611 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
613 if constants.NV_PVLIST in what and vm_capable:
614 result[constants.NV_PVLIST] = \
615 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
616 filter_allocatable=False)
618 if constants.NV_VERSION in what:
619 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
620 constants.RELEASE_VERSION)
622 if constants.NV_HVINFO in what and vm_capable:
623 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
624 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
626 if constants.NV_DRBDLIST in what and vm_capable:
628 used_minors = bdev.DRBD8.GetUsedDevs().keys()
629 except errors.BlockDeviceError, err:
630 logging.warning("Can't get used minors list", exc_info=True)
631 used_minors = str(err)
632 result[constants.NV_DRBDLIST] = used_minors
634 if constants.NV_DRBDHELPER in what and vm_capable:
637 payload = bdev.BaseDRBD.GetUsermodeHelper()
638 except errors.BlockDeviceError, err:
639 logging.error("Can't get DRBD usermode helper: %s", str(err))
642 result[constants.NV_DRBDHELPER] = (status, payload)
644 if constants.NV_NODESETUP in what:
645 result[constants.NV_NODESETUP] = tmpr = []
646 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
647 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
648 " under /sys, missing required directories /sys/block"
649 " and /sys/class/net")
650 if (not os.path.isdir("/proc/sys") or
651 not os.path.isfile("/proc/sysrq-trigger")):
652 tmpr.append("The procfs filesystem doesn't seem to be mounted"
653 " under /proc, missing required directory /proc/sys and"
654 " the file /proc/sysrq-trigger")
656 if constants.NV_TIME in what:
657 result[constants.NV_TIME] = utils.SplitTime(time.time())
659 if constants.NV_OSLIST in what and vm_capable:
660 result[constants.NV_OSLIST] = DiagnoseOS()
662 if constants.NV_BRIDGES in what and vm_capable:
663 result[constants.NV_BRIDGES] = [bridge
664 for bridge in what[constants.NV_BRIDGES]
665 if not utils.BridgeExists(bridge)]
669 def GetBlockDevSizes(devices):
670 """Return the size of the given block devices
673 @param devices: list of block device nodes to query
676 dictionary of all block devices under /dev (key). The value is their
679 {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
685 for devpath in devices:
686 if not utils.IsBelowDir(DEV_PREFIX, devpath):
690 st = os.stat(devpath)
691 except EnvironmentError, err:
692 logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
695 if stat.S_ISBLK(st.st_mode):
696 result = utils.RunCmd(["blockdev", "--getsize64", devpath])
698 # We don't want to fail, just do not list this device as available
699 logging.warning("Cannot get size for block device %s", devpath)
702 size = int(result.stdout) / (1024 * 1024)
703 blockdevs[devpath] = size
707 def GetVolumeList(vg_names):
708 """Compute list of logical volumes and their size.
711 @param vg_names: the volume groups whose LVs we should list, or
712 empty for all volume groups
715 dictionary of all partions (key) with value being a tuple of
716 their size (in MiB), inactive and online status::
718 {'xenvg/test1': ('20.06', True, True)}
720 in case of errors, a string is returned with the error
728 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
729 "--separator=%s" % sep,
730 "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
732 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
734 for line in result.stdout.splitlines():
736 match = _LVSLINE_REGEX.match(line)
738 logging.error("Invalid line returned from lvs output: '%s'", line)
740 vg_name, name, size, attr = match.groups()
741 inactive = attr[4] == "-"
742 online = attr[5] == "o"
743 virtual = attr[0] == "v"
745 # we don't want to report such volumes as existing, since they
746 # don't really hold data
748 lvs[vg_name + "/" + name] = (size, inactive, online)
753 def ListVolumeGroups():
754 """List the volume groups and their size.
757 @return: dictionary with keys volume name and values the
761 return utils.ListVolumeGroups()
765 """List all volumes on this node.
769 A list of dictionaries, each having four keys:
770 - name: the logical volume name,
771 - size: the size of the logical volume
772 - dev: the physical device on which the LV lives
773 - vg: the volume group to which it belongs
775 In case of errors, we return an empty list and log the
778 Note that since a logical volume can live on multiple physical
779 volumes, the resulting list might include a logical volume
783 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
785 "--options=lv_name,lv_size,devices,vg_name"])
787 _Fail("Failed to list logical volumes, lvs output: %s",
791 return dev.split("(")[0]
794 return [parse_dev(x) for x in dev.split(",")]
797 line = [v.strip() for v in line]
798 return [{"name": line[0], "size": line[1],
799 "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
802 for line in result.stdout.splitlines():
803 if line.count("|") >= 3:
804 all_devs.extend(map_line(line.split("|")))
806 logging.warning("Strange line in the output from lvs: '%s'", line)
810 def BridgesExist(bridges_list):
811 """Check if a list of bridges exist on the current node.
814 @return: C{True} if all of them exist, C{False} otherwise
818 for bridge in bridges_list:
819 if not utils.BridgeExists(bridge):
820 missing.append(bridge)
823 _Fail("Missing bridges %s", utils.CommaJoin(missing))
826 def GetInstanceList(hypervisor_list):
827 """Provides a list of instances.
829 @type hypervisor_list: list
830 @param hypervisor_list: the list of hypervisors to query information
833 @return: a list of all running instances on the current node
834 - instance1.example.com
835 - instance2.example.com
839 for hname in hypervisor_list:
841 names = hypervisor.GetHypervisor(hname).ListInstances()
842 results.extend(names)
843 except errors.HypervisorError, err:
844 _Fail("Error enumerating instances (hypervisor %s): %s",
845 hname, err, exc=True)
850 def GetInstanceInfo(instance, hname):
851 """Gives back the information about an instance as a dictionary.
853 @type instance: string
854 @param instance: the instance name
856 @param hname: the hypervisor type of the instance
859 @return: dictionary with the following keys:
860 - memory: memory size of instance (int)
861 - state: xen state of instance (string)
862 - time: cpu time of instance (float)
867 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
868 if iinfo is not None:
869 output["memory"] = iinfo[2]
870 output["state"] = iinfo[4]
871 output["time"] = iinfo[5]
876 def GetInstanceMigratable(instance):
877 """Gives whether an instance can be migrated.
879 @type instance: L{objects.Instance}
880 @param instance: object representing the instance to be checked.
883 @return: tuple of (result, description) where:
884 - result: whether the instance can be migrated or not
885 - description: a description of the issue, if relevant
888 hyper = hypervisor.GetHypervisor(instance.hypervisor)
889 iname = instance.name
890 if iname not in hyper.ListInstances():
891 _Fail("Instance %s is not running", iname)
893 for idx in range(len(instance.disks)):
894 link_name = _GetBlockDevSymlinkPath(iname, idx)
895 if not os.path.islink(link_name):
896 logging.warning("Instance %s is missing symlink %s for disk %d",
897 iname, link_name, idx)
900 def GetAllInstancesInfo(hypervisor_list):
901 """Gather data about all instances.
903 This is the equivalent of L{GetInstanceInfo}, except that it
904 computes data for all instances at once, thus being faster if one
905 needs data about more than one instance.
907 @type hypervisor_list: list
908 @param hypervisor_list: list of hypervisors to query for instance data
911 @return: dictionary of instance: data, with data having the following keys:
912 - memory: memory size of instance (int)
913 - state: xen state of instance (string)
914 - time: cpu time of instance (float)
915 - vcpus: the number of vcpus
920 for hname in hypervisor_list:
921 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
923 for name, _, memory, vcpus, state, times in iinfo:
931 # we only check static parameters, like memory and vcpus,
932 # and not state and time which can change between the
933 # invocations of the different hypervisors
934 for key in "memory", "vcpus":
935 if value[key] != output[name][key]:
936 _Fail("Instance %s is running twice"
937 " with different parameters", name)
943 def _InstanceLogName(kind, os_name, instance, component):
944 """Compute the OS log filename for a given instance and operation.
946 The instance name and os name are passed in as strings since not all
947 operations have these as part of an instance object.
950 @param kind: the operation type (e.g. add, import, etc.)
951 @type os_name: string
952 @param os_name: the os name
953 @type instance: string
954 @param instance: the name of the instance being imported/added/etc.
955 @type component: string or None
956 @param component: the name of the component of the instance being
960 # TODO: Use tempfile.mkstemp to create unique filename
962 assert "/" not in component
963 c_msg = "-%s" % component
966 base = ("%s-%s-%s%s-%s.log" %
967 (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
968 return utils.PathJoin(constants.LOG_OS_DIR, base)
971 def InstanceOsAdd(instance, reinstall, debug):
972 """Add an OS to an instance.
974 @type instance: L{objects.Instance}
975 @param instance: Instance whose OS is to be installed
976 @type reinstall: boolean
977 @param reinstall: whether this is an instance reinstall
979 @param debug: debug level, passed to the OS scripts
983 inst_os = OSFromDisk(instance.os)
985 create_env = OSEnvironment(instance, inst_os, debug)
987 create_env["INSTANCE_REINSTALL"] = "1"
989 logfile = _InstanceLogName("add", instance.os, instance.name, None)
991 result = utils.RunCmd([inst_os.create_script], env=create_env,
992 cwd=inst_os.path, output=logfile, reset_env=True)
994 logging.error("os create command '%s' returned error: %s, logfile: %s,"
995 " output: %s", result.cmd, result.fail_reason, logfile,
997 lines = [utils.SafeEncode(val)
998 for val in utils.TailFile(logfile, lines=20)]
999 _Fail("OS create script failed (%s), last lines in the"
1000 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1003 def RunRenameInstance(instance, old_name, debug):
1004 """Run the OS rename script for an instance.
1006 @type instance: L{objects.Instance}
1007 @param instance: Instance whose OS is to be installed
1008 @type old_name: string
1009 @param old_name: previous instance name
1010 @type debug: integer
1011 @param debug: debug level, passed to the OS scripts
1013 @return: the success of the operation
1016 inst_os = OSFromDisk(instance.os)
1018 rename_env = OSEnvironment(instance, inst_os, debug)
1019 rename_env["OLD_INSTANCE_NAME"] = old_name
1021 logfile = _InstanceLogName("rename", instance.os,
1022 "%s-%s" % (old_name, instance.name), None)
1024 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1025 cwd=inst_os.path, output=logfile, reset_env=True)
1028 logging.error("os create command '%s' returned error: %s output: %s",
1029 result.cmd, result.fail_reason, result.output)
1030 lines = [utils.SafeEncode(val)
1031 for val in utils.TailFile(logfile, lines=20)]
1032 _Fail("OS rename script failed (%s), last lines in the"
1033 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1036 def _GetBlockDevSymlinkPath(instance_name, idx):
1037 return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1038 (instance_name, constants.DISK_SEPARATOR, idx))
1041 def _SymlinkBlockDev(instance_name, device_path, idx):
1042 """Set up symlinks to a instance's block device.
1044 This is an auxiliary function run when an instance is start (on the primary
1045 node) or when an instance is migrated (on the target node).
1048 @param instance_name: the name of the target instance
1049 @param device_path: path of the physical block device, on the node
1050 @param idx: the disk index
1051 @return: absolute path to the disk's symlink
1054 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1056 os.symlink(device_path, link_name)
1057 except OSError, err:
1058 if err.errno == errno.EEXIST:
1059 if (not os.path.islink(link_name) or
1060 os.readlink(link_name) != device_path):
1061 os.remove(link_name)
1062 os.symlink(device_path, link_name)
1069 def _RemoveBlockDevLinks(instance_name, disks):
1070 """Remove the block device symlinks belonging to the given instance.
1073 for idx, _ in enumerate(disks):
1074 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1075 if os.path.islink(link_name):
1077 os.remove(link_name)
1079 logging.exception("Can't remove symlink '%s'", link_name)
1082 def _GatherAndLinkBlockDevs(instance):
1083 """Set up an instance's block device(s).
1085 This is run on the primary node at instance startup. The block
1086 devices must be already assembled.
1088 @type instance: L{objects.Instance}
1089 @param instance: the instance whose disks we shoul assemble
1091 @return: list of (disk_object, device_path)
1095 for idx, disk in enumerate(instance.disks):
1096 device = _RecursiveFindBD(disk)
1098 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1102 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1104 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1107 block_devices.append((disk, link_name))
1109 return block_devices
1112 def StartInstance(instance, startup_paused):
1113 """Start an instance.
1115 @type instance: L{objects.Instance}
1116 @param instance: the instance object
1117 @type startup_paused: bool
1118 @param instance: pause instance at startup?
1122 running_instances = GetInstanceList([instance.hypervisor])
1124 if instance.name in running_instances:
1125 logging.info("Instance %s already running, not starting", instance.name)
1129 block_devices = _GatherAndLinkBlockDevs(instance)
1130 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1131 hyper.StartInstance(instance, block_devices, startup_paused)
1132 except errors.BlockDeviceError, err:
1133 _Fail("Block device error: %s", err, exc=True)
1134 except errors.HypervisorError, err:
1135 _RemoveBlockDevLinks(instance.name, instance.disks)
1136 _Fail("Hypervisor error: %s", err, exc=True)
1139 def InstanceShutdown(instance, timeout):
1140 """Shut an instance down.
1142 @note: this functions uses polling with a hardcoded timeout.
1144 @type instance: L{objects.Instance}
1145 @param instance: the instance object
1146 @type timeout: integer
1147 @param timeout: maximum timeout for soft shutdown
1151 hv_name = instance.hypervisor
1152 hyper = hypervisor.GetHypervisor(hv_name)
1153 iname = instance.name
1155 if instance.name not in hyper.ListInstances():
1156 logging.info("Instance %s not running, doing nothing", iname)
1161 self.tried_once = False
1164 if iname not in hyper.ListInstances():
1168 hyper.StopInstance(instance, retry=self.tried_once)
1169 except errors.HypervisorError, err:
1170 if iname not in hyper.ListInstances():
1171 # if the instance is no longer existing, consider this a
1172 # success and go to cleanup
1175 _Fail("Failed to stop instance %s: %s", iname, err)
1177 self.tried_once = True
1179 raise utils.RetryAgain()
1182 utils.Retry(_TryShutdown(), 5, timeout)
1183 except utils.RetryTimeout:
1184 # the shutdown did not succeed
1185 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1188 hyper.StopInstance(instance, force=True)
1189 except errors.HypervisorError, err:
1190 if iname in hyper.ListInstances():
1191 # only raise an error if the instance still exists, otherwise
1192 # the error could simply be "instance ... unknown"!
1193 _Fail("Failed to force stop instance %s: %s", iname, err)
1197 if iname in hyper.ListInstances():
1198 _Fail("Could not shutdown instance %s even by destroy", iname)
1201 hyper.CleanupInstance(instance.name)
1202 except errors.HypervisorError, err:
1203 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1205 _RemoveBlockDevLinks(iname, instance.disks)
1208 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1209 """Reboot an instance.
1211 @type instance: L{objects.Instance}
1212 @param instance: the instance object to reboot
1213 @type reboot_type: str
1214 @param reboot_type: the type of reboot, one the following
1216 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1217 instance OS, do not recreate the VM
1218 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1219 restart the VM (at the hypervisor level)
1220 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1221 not accepted here, since that mode is handled differently, in
1222 cmdlib, and translates into full stop and start of the
1223 instance (instead of a call_instance_reboot RPC)
1224 @type shutdown_timeout: integer
1225 @param shutdown_timeout: maximum timeout for soft shutdown
1229 running_instances = GetInstanceList([instance.hypervisor])
1231 if instance.name not in running_instances:
1232 _Fail("Cannot reboot instance %s that is not running", instance.name)
1234 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1235 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1237 hyper.RebootInstance(instance)
1238 except errors.HypervisorError, err:
1239 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1240 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1242 InstanceShutdown(instance, shutdown_timeout)
1243 return StartInstance(instance, False)
1244 except errors.HypervisorError, err:
1245 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1247 _Fail("Invalid reboot_type received: %s", reboot_type)
1250 def MigrationInfo(instance):
1251 """Gather information about an instance to be migrated.
1253 @type instance: L{objects.Instance}
1254 @param instance: the instance definition
1257 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1259 info = hyper.MigrationInfo(instance)
1260 except errors.HypervisorError, err:
1261 _Fail("Failed to fetch migration information: %s", err, exc=True)
1265 def AcceptInstance(instance, info, target):
1266 """Prepare the node to accept an instance.
1268 @type instance: L{objects.Instance}
1269 @param instance: the instance definition
1270 @type info: string/data (opaque)
1271 @param info: migration information, from the source node
1272 @type target: string
1273 @param target: target host (usually ip), on this node
1276 # TODO: why is this required only for DTS_EXT_MIRROR?
1277 if instance.disk_template in constants.DTS_EXT_MIRROR:
1278 # Create the symlinks, as the disks are not active
1281 _GatherAndLinkBlockDevs(instance)
1282 except errors.BlockDeviceError, err:
1283 _Fail("Block device error: %s", err, exc=True)
1285 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1287 hyper.AcceptInstance(instance, info, target)
1288 except errors.HypervisorError, err:
1289 if instance.disk_template in constants.DTS_EXT_MIRROR:
1290 _RemoveBlockDevLinks(instance.name, instance.disks)
1291 _Fail("Failed to accept instance: %s", err, exc=True)
1294 def FinalizeMigration(instance, info, success):
1295 """Finalize any preparation to accept an instance.
1297 @type instance: L{objects.Instance}
1298 @param instance: the instance definition
1299 @type info: string/data (opaque)
1300 @param info: migration information, from the source node
1301 @type success: boolean
1302 @param success: whether the migration was a success or a failure
1305 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1307 hyper.FinalizeMigration(instance, info, success)
1308 except errors.HypervisorError, err:
1309 _Fail("Failed to finalize migration: %s", err, exc=True)
1312 def MigrateInstance(instance, target, live):
1313 """Migrates an instance to another node.
1315 @type instance: L{objects.Instance}
1316 @param instance: the instance definition
1317 @type target: string
1318 @param target: the target node name
1320 @param live: whether the migration should be done live or not (the
1321 interpretation of this parameter is left to the hypervisor)
1323 @return: a tuple of (success, msg) where:
1324 - succes is a boolean denoting the success/failure of the operation
1325 - msg is a string with details in case of failure
1328 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1331 hyper.MigrateInstance(instance, target, live)
1332 except errors.HypervisorError, err:
1333 _Fail("Failed to migrate instance: %s", err, exc=True)
1336 def BlockdevCreate(disk, size, owner, on_primary, info):
1337 """Creates a block device for an instance.
1339 @type disk: L{objects.Disk}
1340 @param disk: the object describing the disk we should create
1342 @param size: the size of the physical underlying device, in MiB
1344 @param owner: the name of the instance for which disk is created,
1345 used for device cache data
1346 @type on_primary: boolean
1347 @param on_primary: indicates if it is the primary node or not
1349 @param info: string that will be sent to the physical device
1350 creation, used for example to set (LVM) tags on LVs
1352 @return: the new unique_id of the device (this can sometime be
1353 computed only after creation), or None. On secondary nodes,
1354 it's not required to return anything.
1357 # TODO: remove the obsolete "size" argument
1358 # pylint: disable=W0613
1361 for child in disk.children:
1363 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1364 except errors.BlockDeviceError, err:
1365 _Fail("Can't assemble device %s: %s", child, err)
1366 if on_primary or disk.AssembleOnSecondary():
1367 # we need the children open in case the device itself has to
1370 # pylint: disable=E1103
1372 except errors.BlockDeviceError, err:
1373 _Fail("Can't make child '%s' read-write: %s", child, err)
1377 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1378 except errors.BlockDeviceError, err:
1379 _Fail("Can't create block device: %s", err)
1381 if on_primary or disk.AssembleOnSecondary():
1384 except errors.BlockDeviceError, err:
1385 _Fail("Can't assemble device after creation, unusual event: %s", err)
1386 device.SetSyncSpeed(constants.SYNC_SPEED)
1387 if on_primary or disk.OpenOnSecondary():
1389 device.Open(force=True)
1390 except errors.BlockDeviceError, err:
1391 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1392 DevCacheManager.UpdateCache(device.dev_path, owner,
1393 on_primary, disk.iv_name)
1395 device.SetInfo(info)
1397 return device.unique_id
1400 def _WipeDevice(path, offset, size):
1401 """This function actually wipes the device.
1403 @param path: The path to the device to wipe
1404 @param offset: The offset in MiB in the file
1405 @param size: The size in MiB to write
1408 cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1409 "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1411 result = utils.RunCmd(cmd)
1414 _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1415 result.fail_reason, result.output)
1418 def BlockdevWipe(disk, offset, size):
1419 """Wipes a block device.
1421 @type disk: L{objects.Disk}
1422 @param disk: the disk object we want to wipe
1424 @param offset: The offset in MiB in the file
1426 @param size: The size in MiB to write
1430 rdev = _RecursiveFindBD(disk)
1431 except errors.BlockDeviceError:
1435 _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1437 # Do cross verify some of the parameters
1438 if offset > rdev.size:
1439 _Fail("Offset is bigger than device size")
1440 if (offset + size) > rdev.size:
1441 _Fail("The provided offset and size to wipe is bigger than device size")
1443 _WipeDevice(rdev.dev_path, offset, size)
1446 def BlockdevPauseResumeSync(disks, pause):
1447 """Pause or resume the sync of the block device.
1449 @type disks: list of L{objects.Disk}
1450 @param disks: the disks object we want to pause/resume
1452 @param pause: Wheater to pause or resume
1458 rdev = _RecursiveFindBD(disk)
1459 except errors.BlockDeviceError:
1463 success.append((False, ("Cannot change sync for device %s:"
1464 " device not found" % disk.iv_name)))
1467 result = rdev.PauseResumeSync(pause)
1470 success.append((result, None))
1476 success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1481 def BlockdevRemove(disk):
1482 """Remove a block device.
1484 @note: This is intended to be called recursively.
1486 @type disk: L{objects.Disk}
1487 @param disk: the disk object we should remove
1489 @return: the success of the operation
1494 rdev = _RecursiveFindBD(disk)
1495 except errors.BlockDeviceError, err:
1496 # probably can't attach
1497 logging.info("Can't attach to device %s in remove", disk)
1499 if rdev is not None:
1500 r_path = rdev.dev_path
1503 except errors.BlockDeviceError, err:
1504 msgs.append(str(err))
1506 DevCacheManager.RemoveCache(r_path)
1509 for child in disk.children:
1511 BlockdevRemove(child)
1512 except RPCFail, err:
1513 msgs.append(str(err))
1516 _Fail("; ".join(msgs))
1519 def _RecursiveAssembleBD(disk, owner, as_primary):
1520 """Activate a block device for an instance.
1522 This is run on the primary and secondary nodes for an instance.
1524 @note: this function is called recursively.
1526 @type disk: L{objects.Disk}
1527 @param disk: the disk we try to assemble
1529 @param owner: the name of the instance which owns the disk
1530 @type as_primary: boolean
1531 @param as_primary: if we should make the block device
1534 @return: the assembled device or None (in case no device
1536 @raise errors.BlockDeviceError: in case there is an error
1537 during the activation of the children or the device
1543 mcn = disk.ChildrenNeeded()
1545 mcn = 0 # max number of Nones allowed
1547 mcn = len(disk.children) - mcn # max number of Nones
1548 for chld_disk in disk.children:
1550 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1551 except errors.BlockDeviceError, err:
1552 if children.count(None) >= mcn:
1555 logging.error("Error in child activation (but continuing): %s",
1557 children.append(cdev)
1559 if as_primary or disk.AssembleOnSecondary():
1560 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1561 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1563 if as_primary or disk.OpenOnSecondary():
1565 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1566 as_primary, disk.iv_name)
1573 def BlockdevAssemble(disk, owner, as_primary, idx):
1574 """Activate a block device for an instance.
1576 This is a wrapper over _RecursiveAssembleBD.
1578 @rtype: str or boolean
1579 @return: a C{/dev/...} path for primary nodes, and
1580 C{True} for secondary nodes
1584 result = _RecursiveAssembleBD(disk, owner, as_primary)
1585 if isinstance(result, bdev.BlockDev):
1586 # pylint: disable=E1103
1587 result = result.dev_path
1589 _SymlinkBlockDev(owner, result, idx)
1590 except errors.BlockDeviceError, err:
1591 _Fail("Error while assembling disk: %s", err, exc=True)
1592 except OSError, err:
1593 _Fail("Error while symlinking disk: %s", err, exc=True)
1598 def BlockdevShutdown(disk):
1599 """Shut down a block device.
1601 First, if the device is assembled (Attach() is successful), then
1602 the device is shutdown. Then the children of the device are
1605 This function is called recursively. Note that we don't cache the
1606 children or such, as oppossed to assemble, shutdown of different
1607 devices doesn't require that the upper device was active.
1609 @type disk: L{objects.Disk}
1610 @param disk: the description of the disk we should
1616 r_dev = _RecursiveFindBD(disk)
1617 if r_dev is not None:
1618 r_path = r_dev.dev_path
1621 DevCacheManager.RemoveCache(r_path)
1622 except errors.BlockDeviceError, err:
1623 msgs.append(str(err))
1626 for child in disk.children:
1628 BlockdevShutdown(child)
1629 except RPCFail, err:
1630 msgs.append(str(err))
1633 _Fail("; ".join(msgs))
1636 def BlockdevAddchildren(parent_cdev, new_cdevs):
1637 """Extend a mirrored block device.
1639 @type parent_cdev: L{objects.Disk}
1640 @param parent_cdev: the disk to which we should add children
1641 @type new_cdevs: list of L{objects.Disk}
1642 @param new_cdevs: the list of children which we should add
1646 parent_bdev = _RecursiveFindBD(parent_cdev)
1647 if parent_bdev is None:
1648 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1649 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1650 if new_bdevs.count(None) > 0:
1651 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1652 parent_bdev.AddChildren(new_bdevs)
1655 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1656 """Shrink a mirrored block device.
1658 @type parent_cdev: L{objects.Disk}
1659 @param parent_cdev: the disk from which we should remove children
1660 @type new_cdevs: list of L{objects.Disk}
1661 @param new_cdevs: the list of children which we should remove
1665 parent_bdev = _RecursiveFindBD(parent_cdev)
1666 if parent_bdev is None:
1667 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1669 for disk in new_cdevs:
1670 rpath = disk.StaticDevPath()
1672 bd = _RecursiveFindBD(disk)
1674 _Fail("Can't find device %s while removing children", disk)
1676 devs.append(bd.dev_path)
1678 if not utils.IsNormAbsPath(rpath):
1679 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1681 parent_bdev.RemoveChildren(devs)
1684 def BlockdevGetmirrorstatus(disks):
1685 """Get the mirroring status of a list of devices.
1687 @type disks: list of L{objects.Disk}
1688 @param disks: the list of disks which we should query
1690 @return: List of L{objects.BlockDevStatus}, one for each disk
1691 @raise errors.BlockDeviceError: if any of the disks cannot be
1697 rbd = _RecursiveFindBD(dsk)
1699 _Fail("Can't find device %s", dsk)
1701 stats.append(rbd.CombinedSyncStatus())
1706 def BlockdevGetmirrorstatusMulti(disks):
1707 """Get the mirroring status of a list of devices.
1709 @type disks: list of L{objects.Disk}
1710 @param disks: the list of disks which we should query
1712 @return: List of tuples, (bool, status), one for each disk; bool denotes
1713 success/failure, status is L{objects.BlockDevStatus} on success, string
1720 rbd = _RecursiveFindBD(disk)
1722 result.append((False, "Can't find device %s" % disk))
1725 status = rbd.CombinedSyncStatus()
1726 except errors.BlockDeviceError, err:
1727 logging.exception("Error while getting disk status")
1728 result.append((False, str(err)))
1730 result.append((True, status))
1732 assert len(disks) == len(result)
1737 def _RecursiveFindBD(disk):
1738 """Check if a device is activated.
1740 If so, return information about the real device.
1742 @type disk: L{objects.Disk}
1743 @param disk: the disk object we need to find
1745 @return: None if the device can't be found,
1746 otherwise the device instance
1751 for chdisk in disk.children:
1752 children.append(_RecursiveFindBD(chdisk))
1754 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1757 def _OpenRealBD(disk):
1758 """Opens the underlying block device of a disk.
1760 @type disk: L{objects.Disk}
1761 @param disk: the disk object we want to open
1764 real_disk = _RecursiveFindBD(disk)
1765 if real_disk is None:
1766 _Fail("Block device '%s' is not set up", disk)
1773 def BlockdevFind(disk):
1774 """Check if a device is activated.
1776 If it is, return information about the real device.
1778 @type disk: L{objects.Disk}
1779 @param disk: the disk to find
1780 @rtype: None or objects.BlockDevStatus
1781 @return: None if the disk cannot be found, otherwise a the current
1786 rbd = _RecursiveFindBD(disk)
1787 except errors.BlockDeviceError, err:
1788 _Fail("Failed to find device: %s", err, exc=True)
1793 return rbd.GetSyncStatus()
1796 def BlockdevGetsize(disks):
1797 """Computes the size of the given disks.
1799 If a disk is not found, returns None instead.
1801 @type disks: list of L{objects.Disk}
1802 @param disks: the list of disk to compute the size for
1804 @return: list with elements None if the disk cannot be found,
1811 rbd = _RecursiveFindBD(cf)
1812 except errors.BlockDeviceError:
1818 result.append(rbd.GetActualSize())
1822 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1823 """Export a block device to a remote node.
1825 @type disk: L{objects.Disk}
1826 @param disk: the description of the disk to export
1827 @type dest_node: str
1828 @param dest_node: the destination node to export to
1829 @type dest_path: str
1830 @param dest_path: the destination path on the target node
1831 @type cluster_name: str
1832 @param cluster_name: the cluster name, needed for SSH hostalias
1836 real_disk = _OpenRealBD(disk)
1838 # the block size on the read dd is 1MiB to match our units
1839 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1840 "dd if=%s bs=1048576 count=%s",
1841 real_disk.dev_path, str(disk.size))
1843 # we set here a smaller block size as, due to ssh buffering, more
1844 # than 64-128k will mostly ignored; we use nocreat to fail if the
1845 # device is not already there or we pass a wrong path; we use
1846 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1847 # to not buffer too much memory; this means that at best, we flush
1848 # every 64k, which will not be very fast
1849 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1850 " oflag=dsync", dest_path)
1852 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1853 constants.GANETI_RUNAS,
1856 # all commands have been checked, so we're safe to combine them
1857 command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1859 result = utils.RunCmd(["bash", "-c", command])
1862 _Fail("Disk copy command '%s' returned error: %s"
1863 " output: %s", command, result.fail_reason, result.output)
1866 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1867 """Write a file to the filesystem.
1869 This allows the master to overwrite(!) a file. It will only perform
1870 the operation if the file belongs to a list of configuration files.
1872 @type file_name: str
1873 @param file_name: the target file name
1875 @param data: the new contents of the file
1877 @param mode: the mode to give the file (can be None)
1879 @param uid: the owner of the file
1881 @param gid: the group of the file
1883 @param atime: the atime to set on the file (can be None)
1885 @param mtime: the mtime to set on the file (can be None)
1889 if not os.path.isabs(file_name):
1890 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1892 if file_name not in _ALLOWED_UPLOAD_FILES:
1893 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1896 raw_data = _Decompress(data)
1898 if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1899 _Fail("Invalid username/groupname type")
1901 getents = runtime.GetEnts()
1902 uid = getents.LookupUser(uid)
1903 gid = getents.LookupGroup(gid)
1905 utils.SafeWriteFile(file_name, None,
1906 data=raw_data, mode=mode, uid=uid, gid=gid,
1907 atime=atime, mtime=mtime)
1910 def RunOob(oob_program, command, node, timeout):
1911 """Executes oob_program with given command on given node.
1913 @param oob_program: The path to the executable oob_program
1914 @param command: The command to invoke on oob_program
1915 @param node: The node given as an argument to the program
1916 @param timeout: Timeout after which we kill the oob program
1919 @raise RPCFail: If execution fails for some reason
1922 result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1925 _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1926 result.fail_reason, result.output)
1928 return result.stdout
1931 def WriteSsconfFiles(values):
1932 """Update all ssconf files.
1934 Wrapper around the SimpleStore.WriteFiles.
1937 ssconf.SimpleStore().WriteFiles(values)
1940 def _ErrnoOrStr(err):
1941 """Format an EnvironmentError exception.
1943 If the L{err} argument has an errno attribute, it will be looked up
1944 and converted into a textual C{E...} description. Otherwise the
1945 string representation of the error will be returned.
1947 @type err: L{EnvironmentError}
1948 @param err: the exception to format
1951 if hasattr(err, "errno"):
1952 detail = errno.errorcode[err.errno]
1958 def _OSOndiskAPIVersion(os_dir):
1959 """Compute and return the API version of a given OS.
1961 This function will try to read the API version of the OS residing in
1962 the 'os_dir' directory.
1965 @param os_dir: the directory in which we should look for the OS
1967 @return: tuple (status, data) with status denoting the validity and
1968 data holding either the vaid versions or an error message
1971 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1974 st = os.stat(api_file)
1975 except EnvironmentError, err:
1976 return False, ("Required file '%s' not found under path %s: %s" %
1977 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1979 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1980 return False, ("File '%s' in %s is not a regular file" %
1981 (constants.OS_API_FILE, os_dir))
1984 api_versions = utils.ReadFile(api_file).splitlines()
1985 except EnvironmentError, err:
1986 return False, ("Error while reading the API version file at %s: %s" %
1987 (api_file, _ErrnoOrStr(err)))
1990 api_versions = [int(version.strip()) for version in api_versions]
1991 except (TypeError, ValueError), err:
1992 return False, ("API version(s) can't be converted to integer: %s" %
1995 return True, api_versions
1998 def DiagnoseOS(top_dirs=None):
1999 """Compute the validity for all OSes.
2001 @type top_dirs: list
2002 @param top_dirs: the list of directories in which to
2003 search (if not given defaults to
2004 L{constants.OS_SEARCH_PATH})
2005 @rtype: list of L{objects.OS}
2006 @return: a list of tuples (name, path, status, diagnose, variants,
2007 parameters, api_version) for all (potential) OSes under all
2008 search paths, where:
2009 - name is the (potential) OS name
2010 - path is the full path to the OS
2011 - status True/False is the validity of the OS
2012 - diagnose is the error message for an invalid OS, otherwise empty
2013 - variants is a list of supported OS variants, if any
2014 - parameters is a list of (name, help) parameters, if any
2015 - api_version is a list of support OS API versions
2018 if top_dirs is None:
2019 top_dirs = constants.OS_SEARCH_PATH
2022 for dir_name in top_dirs:
2023 if os.path.isdir(dir_name):
2025 f_names = utils.ListVisibleFiles(dir_name)
2026 except EnvironmentError, err:
2027 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2029 for name in f_names:
2030 os_path = utils.PathJoin(dir_name, name)
2031 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2034 variants = os_inst.supported_variants
2035 parameters = os_inst.supported_parameters
2036 api_versions = os_inst.api_versions
2039 variants = parameters = api_versions = []
2040 result.append((name, os_path, status, diagnose, variants,
2041 parameters, api_versions))
2046 def _TryOSFromDisk(name, base_dir=None):
2047 """Create an OS instance from disk.
2049 This function will return an OS instance if the given name is a
2052 @type base_dir: string
2053 @keyword base_dir: Base directory containing OS installations.
2054 Defaults to a search in all the OS_SEARCH_PATH dirs.
2056 @return: success and either the OS instance if we find a valid one,
2060 if base_dir is None:
2061 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2063 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2066 return False, "Directory for OS %s not found in search path" % name
2068 status, api_versions = _OSOndiskAPIVersion(os_dir)
2071 return status, api_versions
2073 if not constants.OS_API_VERSIONS.intersection(api_versions):
2074 return False, ("API version mismatch for path '%s': found %s, want %s." %
2075 (os_dir, api_versions, constants.OS_API_VERSIONS))
2077 # OS Files dictionary, we will populate it with the absolute path
2078 # names; if the value is True, then it is a required file, otherwise
2080 os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2082 if max(api_versions) >= constants.OS_API_V15:
2083 os_files[constants.OS_VARIANTS_FILE] = False
2085 if max(api_versions) >= constants.OS_API_V20:
2086 os_files[constants.OS_PARAMETERS_FILE] = True
2088 del os_files[constants.OS_SCRIPT_VERIFY]
2090 for (filename, required) in os_files.items():
2091 os_files[filename] = utils.PathJoin(os_dir, filename)
2094 st = os.stat(os_files[filename])
2095 except EnvironmentError, err:
2096 if err.errno == errno.ENOENT and not required:
2097 del os_files[filename]
2099 return False, ("File '%s' under path '%s' is missing (%s)" %
2100 (filename, os_dir, _ErrnoOrStr(err)))
2102 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2103 return False, ("File '%s' under path '%s' is not a regular file" %
2106 if filename in constants.OS_SCRIPTS:
2107 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2108 return False, ("File '%s' under path '%s' is not executable" %
2112 if constants.OS_VARIANTS_FILE in os_files:
2113 variants_file = os_files[constants.OS_VARIANTS_FILE]
2115 variants = utils.ReadFile(variants_file).splitlines()
2116 except EnvironmentError, err:
2117 # we accept missing files, but not other errors
2118 if err.errno != errno.ENOENT:
2119 return False, ("Error while reading the OS variants file at %s: %s" %
2120 (variants_file, _ErrnoOrStr(err)))
2123 if constants.OS_PARAMETERS_FILE in os_files:
2124 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2126 parameters = utils.ReadFile(parameters_file).splitlines()
2127 except EnvironmentError, err:
2128 return False, ("Error while reading the OS parameters file at %s: %s" %
2129 (parameters_file, _ErrnoOrStr(err)))
2130 parameters = [v.split(None, 1) for v in parameters]
2132 os_obj = objects.OS(name=name, path=os_dir,
2133 create_script=os_files[constants.OS_SCRIPT_CREATE],
2134 export_script=os_files[constants.OS_SCRIPT_EXPORT],
2135 import_script=os_files[constants.OS_SCRIPT_IMPORT],
2136 rename_script=os_files[constants.OS_SCRIPT_RENAME],
2137 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2139 supported_variants=variants,
2140 supported_parameters=parameters,
2141 api_versions=api_versions)
2145 def OSFromDisk(name, base_dir=None):
2146 """Create an OS instance from disk.
2148 This function will return an OS instance if the given name is a
2149 valid OS name. Otherwise, it will raise an appropriate
2150 L{RPCFail} exception, detailing why this is not a valid OS.
2152 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2153 an exception but returns true/false status data.
2155 @type base_dir: string
2156 @keyword base_dir: Base directory containing OS installations.
2157 Defaults to a search in all the OS_SEARCH_PATH dirs.
2158 @rtype: L{objects.OS}
2159 @return: the OS instance if we find a valid one
2160 @raise RPCFail: if we don't find a valid OS
2163 name_only = objects.OS.GetName(name)
2164 status, payload = _TryOSFromDisk(name_only, base_dir)
2172 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2173 """Calculate the basic environment for an os script.
2176 @param os_name: full operating system name (including variant)
2177 @type inst_os: L{objects.OS}
2178 @param inst_os: operating system for which the environment is being built
2179 @type os_params: dict
2180 @param os_params: the OS parameters
2181 @type debug: integer
2182 @param debug: debug level (0 or 1, for OS Api 10)
2184 @return: dict of environment variables
2185 @raise errors.BlockDeviceError: if the block device
2191 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2192 result["OS_API_VERSION"] = "%d" % api_version
2193 result["OS_NAME"] = inst_os.name
2194 result["DEBUG_LEVEL"] = "%d" % debug
2197 if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2198 variant = objects.OS.GetVariant(os_name)
2200 variant = inst_os.supported_variants[0]
2203 result["OS_VARIANT"] = variant
2206 for pname, pvalue in os_params.items():
2207 result["OSP_%s" % pname.upper()] = pvalue
2212 def OSEnvironment(instance, inst_os, debug=0):
2213 """Calculate the environment for an os script.
2215 @type instance: L{objects.Instance}
2216 @param instance: target instance for the os script run
2217 @type inst_os: L{objects.OS}
2218 @param inst_os: operating system for which the environment is being built
2219 @type debug: integer
2220 @param debug: debug level (0 or 1, for OS Api 10)
2222 @return: dict of environment variables
2223 @raise errors.BlockDeviceError: if the block device
2227 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2229 for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2230 result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2232 result["HYPERVISOR"] = instance.hypervisor
2233 result["DISK_COUNT"] = "%d" % len(instance.disks)
2234 result["NIC_COUNT"] = "%d" % len(instance.nics)
2235 result["INSTANCE_SECONDARY_NODES"] = \
2236 ("%s" % " ".join(instance.secondary_nodes))
2239 for idx, disk in enumerate(instance.disks):
2240 real_disk = _OpenRealBD(disk)
2241 result["DISK_%d_PATH" % idx] = real_disk.dev_path
2242 result["DISK_%d_ACCESS" % idx] = disk.mode
2243 if constants.HV_DISK_TYPE in instance.hvparams:
2244 result["DISK_%d_FRONTEND_TYPE" % idx] = \
2245 instance.hvparams[constants.HV_DISK_TYPE]
2246 if disk.dev_type in constants.LDS_BLOCK:
2247 result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2248 elif disk.dev_type == constants.LD_FILE:
2249 result["DISK_%d_BACKEND_TYPE" % idx] = \
2250 "file:%s" % disk.physical_id[0]
2253 for idx, nic in enumerate(instance.nics):
2254 result["NIC_%d_MAC" % idx] = nic.mac
2256 result["NIC_%d_IP" % idx] = nic.ip
2257 result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2258 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2259 result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2260 if nic.nicparams[constants.NIC_LINK]:
2261 result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2262 if constants.HV_NIC_TYPE in instance.hvparams:
2263 result["NIC_%d_FRONTEND_TYPE" % idx] = \
2264 instance.hvparams[constants.HV_NIC_TYPE]
2267 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2268 for key, value in source.items():
2269 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2274 def BlockdevGrow(disk, amount, dryrun):
2275 """Grow a stack of block devices.
2277 This function is called recursively, with the childrens being the
2278 first ones to resize.
2280 @type disk: L{objects.Disk}
2281 @param disk: the disk to be grown
2282 @type amount: integer
2283 @param amount: the amount (in mebibytes) to grow with
2284 @type dryrun: boolean
2285 @param dryrun: whether to execute the operation in simulation mode
2286 only, without actually increasing the size
2287 @rtype: (status, result)
2288 @return: a tuple with the status of the operation (True/False), and
2289 the errors message if status is False
2292 r_dev = _RecursiveFindBD(disk)
2294 _Fail("Cannot find block device %s", disk)
2297 r_dev.Grow(amount, dryrun)
2298 except errors.BlockDeviceError, err:
2299 _Fail("Failed to grow block device: %s", err, exc=True)
2302 def BlockdevSnapshot(disk):
2303 """Create a snapshot copy of a block device.
2305 This function is called recursively, and the snapshot is actually created
2306 just for the leaf lvm backend device.
2308 @type disk: L{objects.Disk}
2309 @param disk: the disk to be snapshotted
2311 @return: snapshot disk ID as (vg, lv)
2314 if disk.dev_type == constants.LD_DRBD8:
2315 if not disk.children:
2316 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2318 return BlockdevSnapshot(disk.children[0])
2319 elif disk.dev_type == constants.LD_LV:
2320 r_dev = _RecursiveFindBD(disk)
2321 if r_dev is not None:
2322 # FIXME: choose a saner value for the snapshot size
2323 # let's stay on the safe side and ask for the full size, for now
2324 return r_dev.Snapshot(disk.size)
2326 _Fail("Cannot find block device %s", disk)
2328 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2329 disk.unique_id, disk.dev_type)
2332 def FinalizeExport(instance, snap_disks):
2333 """Write out the export configuration information.
2335 @type instance: L{objects.Instance}
2336 @param instance: the instance which we export, used for
2337 saving configuration
2338 @type snap_disks: list of L{objects.Disk}
2339 @param snap_disks: list of snapshot block devices, which
2340 will be used to get the actual name of the dump file
2345 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2346 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2348 config = objects.SerializableConfigParser()
2350 config.add_section(constants.INISECT_EXP)
2351 config.set(constants.INISECT_EXP, "version", "0")
2352 config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2353 config.set(constants.INISECT_EXP, "source", instance.primary_node)
2354 config.set(constants.INISECT_EXP, "os", instance.os)
2355 config.set(constants.INISECT_EXP, "compression", "none")
2357 config.add_section(constants.INISECT_INS)
2358 config.set(constants.INISECT_INS, "name", instance.name)
2359 config.set(constants.INISECT_INS, "memory", "%d" %
2360 instance.beparams[constants.BE_MEMORY])
2361 config.set(constants.INISECT_INS, "vcpus", "%d" %
2362 instance.beparams[constants.BE_VCPUS])
2363 config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2364 config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2365 config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2368 for nic_count, nic in enumerate(instance.nics):
2370 config.set(constants.INISECT_INS, "nic%d_mac" %
2371 nic_count, "%s" % nic.mac)
2372 config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2373 for param in constants.NICS_PARAMETER_TYPES:
2374 config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2375 "%s" % nic.nicparams.get(param, None))
2376 # TODO: redundant: on load can read nics until it doesn't exist
2377 config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2380 for disk_count, disk in enumerate(snap_disks):
2383 config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2384 ("%s" % disk.iv_name))
2385 config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2386 ("%s" % disk.physical_id[1]))
2387 config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2390 config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2392 # New-style hypervisor/backend parameters
2394 config.add_section(constants.INISECT_HYP)
2395 for name, value in instance.hvparams.items():
2396 if name not in constants.HVC_GLOBALS:
2397 config.set(constants.INISECT_HYP, name, str(value))
2399 config.add_section(constants.INISECT_BEP)
2400 for name, value in instance.beparams.items():
2401 config.set(constants.INISECT_BEP, name, str(value))
2403 config.add_section(constants.INISECT_OSP)
2404 for name, value in instance.osparams.items():
2405 config.set(constants.INISECT_OSP, name, str(value))
2407 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2408 data=config.Dumps())
2409 shutil.rmtree(finaldestdir, ignore_errors=True)
2410 shutil.move(destdir, finaldestdir)
2413 def ExportInfo(dest):
2414 """Get export configuration information.
2417 @param dest: directory containing the export
2419 @rtype: L{objects.SerializableConfigParser}
2420 @return: a serializable config file containing the
2424 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2426 config = objects.SerializableConfigParser()
2429 if (not config.has_section(constants.INISECT_EXP) or
2430 not config.has_section(constants.INISECT_INS)):
2431 _Fail("Export info file doesn't have the required fields")
2433 return config.Dumps()
2437 """Return a list of exports currently available on this machine.
2440 @return: list of the exports
2443 if os.path.isdir(constants.EXPORT_DIR):
2444 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2446 _Fail("No exports directory")
2449 def RemoveExport(export):
2450 """Remove an existing export from the node.
2453 @param export: the name of the export to remove
2457 target = utils.PathJoin(constants.EXPORT_DIR, export)
2460 shutil.rmtree(target)
2461 except EnvironmentError, err:
2462 _Fail("Error while removing the export: %s", err, exc=True)
2465 def BlockdevRename(devlist):
2466 """Rename a list of block devices.
2468 @type devlist: list of tuples
2469 @param devlist: list of tuples of the form (disk,
2470 new_logical_id, new_physical_id); disk is an
2471 L{objects.Disk} object describing the current disk,
2472 and new logical_id/physical_id is the name we
2475 @return: True if all renames succeeded, False otherwise
2480 for disk, unique_id in devlist:
2481 dev = _RecursiveFindBD(disk)
2483 msgs.append("Can't find device %s in rename" % str(disk))
2487 old_rpath = dev.dev_path
2488 dev.Rename(unique_id)
2489 new_rpath = dev.dev_path
2490 if old_rpath != new_rpath:
2491 DevCacheManager.RemoveCache(old_rpath)
2492 # FIXME: we should add the new cache information here, like:
2493 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2494 # but we don't have the owner here - maybe parse from existing
2495 # cache? for now, we only lose lvm data when we rename, which
2496 # is less critical than DRBD or MD
2497 except errors.BlockDeviceError, err:
2498 msgs.append("Can't rename device '%s' to '%s': %s" %
2499 (dev, unique_id, err))
2500 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2503 _Fail("; ".join(msgs))
2506 def _TransformFileStorageDir(fs_dir):
2507 """Checks whether given file_storage_dir is valid.
2509 Checks wheter the given fs_dir is within the cluster-wide default
2510 file_storage_dir or the shared_file_storage_dir, which are stored in
2511 SimpleStore. Only paths under those directories are allowed.
2514 @param fs_dir: the path to check
2516 @return: the normalized path if valid, None otherwise
2519 if not constants.ENABLE_FILE_STORAGE:
2520 _Fail("File storage disabled at configure time")
2522 fs_dir = os.path.normpath(fs_dir)
2523 base_fstore = cfg.GetFileStorageDir()
2524 base_shared = cfg.GetSharedFileStorageDir()
2525 if not (utils.IsBelowDir(base_fstore, fs_dir) or
2526 utils.IsBelowDir(base_shared, fs_dir)):
2527 _Fail("File storage directory '%s' is not under base file"
2528 " storage directory '%s' or shared storage directory '%s'",
2529 fs_dir, base_fstore, base_shared)
2533 def CreateFileStorageDir(file_storage_dir):
2534 """Create file storage directory.
2536 @type file_storage_dir: str
2537 @param file_storage_dir: directory to create
2540 @return: tuple with first element a boolean indicating wheter dir
2541 creation was successful or not
2544 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2545 if os.path.exists(file_storage_dir):
2546 if not os.path.isdir(file_storage_dir):
2547 _Fail("Specified storage dir '%s' is not a directory",
2551 os.makedirs(file_storage_dir, 0750)
2552 except OSError, err:
2553 _Fail("Cannot create file storage directory '%s': %s",
2554 file_storage_dir, err, exc=True)
2557 def RemoveFileStorageDir(file_storage_dir):
2558 """Remove file storage directory.
2560 Remove it only if it's empty. If not log an error and return.
2562 @type file_storage_dir: str
2563 @param file_storage_dir: the directory we should cleanup
2564 @rtype: tuple (success,)
2565 @return: tuple of one element, C{success}, denoting
2566 whether the operation was successful
2569 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2570 if os.path.exists(file_storage_dir):
2571 if not os.path.isdir(file_storage_dir):
2572 _Fail("Specified Storage directory '%s' is not a directory",
2574 # deletes dir only if empty, otherwise we want to fail the rpc call
2576 os.rmdir(file_storage_dir)
2577 except OSError, err:
2578 _Fail("Cannot remove file storage directory '%s': %s",
2579 file_storage_dir, err)
2582 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2583 """Rename the file storage directory.
2585 @type old_file_storage_dir: str
2586 @param old_file_storage_dir: the current path
2587 @type new_file_storage_dir: str
2588 @param new_file_storage_dir: the name we should rename to
2589 @rtype: tuple (success,)
2590 @return: tuple of one element, C{success}, denoting
2591 whether the operation was successful
2594 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2595 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2596 if not os.path.exists(new_file_storage_dir):
2597 if os.path.isdir(old_file_storage_dir):
2599 os.rename(old_file_storage_dir, new_file_storage_dir)
2600 except OSError, err:
2601 _Fail("Cannot rename '%s' to '%s': %s",
2602 old_file_storage_dir, new_file_storage_dir, err)
2604 _Fail("Specified storage dir '%s' is not a directory",
2605 old_file_storage_dir)
2607 if os.path.exists(old_file_storage_dir):
2608 _Fail("Cannot rename '%s' to '%s': both locations exist",
2609 old_file_storage_dir, new_file_storage_dir)
2612 def _EnsureJobQueueFile(file_name):
2613 """Checks whether the given filename is in the queue directory.
2615 @type file_name: str
2616 @param file_name: the file name we should check
2618 @raises RPCFail: if the file is not valid
2621 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2622 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2625 _Fail("Passed job queue file '%s' does not belong to"
2626 " the queue directory '%s'", file_name, queue_dir)
2629 def JobQueueUpdate(file_name, content):
2630 """Updates a file in the queue directory.
2632 This is just a wrapper over L{utils.io.WriteFile}, with proper
2635 @type file_name: str
2636 @param file_name: the job file name
2638 @param content: the new job contents
2640 @return: the success of the operation
2643 _EnsureJobQueueFile(file_name)
2644 getents = runtime.GetEnts()
2646 # Write and replace the file atomically
2647 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2648 gid=getents.masterd_gid)
2651 def JobQueueRename(old, new):
2652 """Renames a job queue file.
2654 This is just a wrapper over os.rename with proper checking.
2657 @param old: the old (actual) file name
2659 @param new: the desired file name
2661 @return: the success of the operation and payload
2664 _EnsureJobQueueFile(old)
2665 _EnsureJobQueueFile(new)
2667 utils.RenameFile(old, new, mkdir=True)
2670 def BlockdevClose(instance_name, disks):
2671 """Closes the given block devices.
2673 This means they will be switched to secondary mode (in case of
2676 @param instance_name: if the argument is not empty, the symlinks
2677 of this instance will be removed
2678 @type disks: list of L{objects.Disk}
2679 @param disks: the list of disks to be closed
2680 @rtype: tuple (success, message)
2681 @return: a tuple of success and message, where success
2682 indicates the succes of the operation, and message
2683 which will contain the error details in case we
2689 rd = _RecursiveFindBD(cf)
2691 _Fail("Can't find device %s", cf)
2698 except errors.BlockDeviceError, err:
2699 msg.append(str(err))
2701 _Fail("Can't make devices secondary: %s", ",".join(msg))
2704 _RemoveBlockDevLinks(instance_name, disks)
2707 def ValidateHVParams(hvname, hvparams):
2708 """Validates the given hypervisor parameters.
2710 @type hvname: string
2711 @param hvname: the hypervisor name
2712 @type hvparams: dict
2713 @param hvparams: the hypervisor parameters to be validated
2718 hv_type = hypervisor.GetHypervisor(hvname)
2719 hv_type.ValidateParameters(hvparams)
2720 except errors.HypervisorError, err:
2721 _Fail(str(err), log=False)
2724 def _CheckOSPList(os_obj, parameters):
2725 """Check whether a list of parameters is supported by the OS.
2727 @type os_obj: L{objects.OS}
2728 @param os_obj: OS object to check
2729 @type parameters: list
2730 @param parameters: the list of parameters to check
2733 supported = [v[0] for v in os_obj.supported_parameters]
2734 delta = frozenset(parameters).difference(supported)
2736 _Fail("The following parameters are not supported"
2737 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2740 def ValidateOS(required, osname, checks, osparams):
2741 """Validate the given OS' parameters.
2743 @type required: boolean
2744 @param required: whether absence of the OS should translate into
2746 @type osname: string
2747 @param osname: the OS to be validated
2749 @param checks: list of the checks to run (currently only 'parameters')
2750 @type osparams: dict
2751 @param osparams: dictionary with OS parameters
2753 @return: True if the validation passed, or False if the OS was not
2754 found and L{required} was false
2757 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2758 _Fail("Unknown checks required for OS %s: %s", osname,
2759 set(checks).difference(constants.OS_VALIDATE_CALLS))
2761 name_only = objects.OS.GetName(osname)
2762 status, tbv = _TryOSFromDisk(name_only, None)
2770 if max(tbv.api_versions) < constants.OS_API_V20:
2773 if constants.OS_VALIDATE_PARAMETERS in checks:
2774 _CheckOSPList(tbv, osparams.keys())
2776 validate_env = OSCoreEnv(osname, tbv, osparams)
2777 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2778 cwd=tbv.path, reset_env=True)
2780 logging.error("os validate command '%s' returned error: %s output: %s",
2781 result.cmd, result.fail_reason, result.output)
2782 _Fail("OS validation script failed (%s), output: %s",
2783 result.fail_reason, result.output, log=False)
2789 """Demotes the current node from master candidate role.
2792 # try to ensure we're not the master by mistake
2793 master, myself = ssconf.GetMasterAndMyself()
2794 if master == myself:
2795 _Fail("ssconf status shows I'm the master node, will not demote")
2797 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2798 if not result.failed:
2799 _Fail("The master daemon is running, will not demote")
2802 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2803 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2804 except EnvironmentError, err:
2805 if err.errno != errno.ENOENT:
2806 _Fail("Error while backing up cluster file: %s", err, exc=True)
2808 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2811 def _GetX509Filenames(cryptodir, name):
2812 """Returns the full paths for the private key and certificate.
2815 return (utils.PathJoin(cryptodir, name),
2816 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2817 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2820 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2821 """Creates a new X509 certificate for SSL/TLS.
2824 @param validity: Validity in seconds
2825 @rtype: tuple; (string, string)
2826 @return: Certificate name and public part
2829 (key_pem, cert_pem) = \
2830 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2831 min(validity, _MAX_SSL_CERT_VALIDITY))
2833 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2834 prefix="x509-%s-" % utils.TimestampForFilename())
2836 name = os.path.basename(cert_dir)
2837 assert len(name) > 5
2839 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2841 utils.WriteFile(key_file, mode=0400, data=key_pem)
2842 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2844 # Never return private key as it shouldn't leave the node
2845 return (name, cert_pem)
2847 shutil.rmtree(cert_dir, ignore_errors=True)
2851 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2852 """Removes a X509 certificate.
2855 @param name: Certificate name
2858 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2860 utils.RemoveFile(key_file)
2861 utils.RemoveFile(cert_file)
2865 except EnvironmentError, err:
2866 _Fail("Cannot remove certificate directory '%s': %s",
2870 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2871 """Returns the command for the requested input/output.
2873 @type instance: L{objects.Instance}
2874 @param instance: The instance object
2875 @param mode: Import/export mode
2876 @param ieio: Input/output type
2877 @param ieargs: Input/output arguments
2880 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2887 if ieio == constants.IEIO_FILE:
2888 (filename, ) = ieargs
2890 if not utils.IsNormAbsPath(filename):
2891 _Fail("Path '%s' is not normalized or absolute", filename)
2893 real_filename = os.path.realpath(filename)
2894 directory = os.path.dirname(real_filename)
2896 if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2897 _Fail("File '%s' is not under exports directory '%s': %s",
2898 filename, constants.EXPORT_DIR, real_filename)
2901 utils.Makedirs(directory, mode=0750)
2903 quoted_filename = utils.ShellQuote(filename)
2905 if mode == constants.IEM_IMPORT:
2906 suffix = "> %s" % quoted_filename
2907 elif mode == constants.IEM_EXPORT:
2908 suffix = "< %s" % quoted_filename
2910 # Retrieve file size
2912 st = os.stat(filename)
2913 except EnvironmentError, err:
2914 logging.error("Can't stat(2) %s: %s", filename, err)
2916 exp_size = utils.BytesToMebibyte(st.st_size)
2918 elif ieio == constants.IEIO_RAW_DISK:
2921 real_disk = _OpenRealBD(disk)
2923 if mode == constants.IEM_IMPORT:
2924 # we set here a smaller block size as, due to transport buffering, more
2925 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2926 # is not already there or we pass a wrong path; we use notrunc to no
2927 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2928 # much memory; this means that at best, we flush every 64k, which will
2930 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2931 " bs=%s oflag=dsync"),
2935 elif mode == constants.IEM_EXPORT:
2936 # the block size on the read dd is 1MiB to match our units
2937 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2939 str(1024 * 1024), # 1 MB
2941 exp_size = disk.size
2943 elif ieio == constants.IEIO_SCRIPT:
2944 (disk, disk_index, ) = ieargs
2946 assert isinstance(disk_index, (int, long))
2948 real_disk = _OpenRealBD(disk)
2950 inst_os = OSFromDisk(instance.os)
2951 env = OSEnvironment(instance, inst_os)
2953 if mode == constants.IEM_IMPORT:
2954 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2955 env["IMPORT_INDEX"] = str(disk_index)
2956 script = inst_os.import_script
2958 elif mode == constants.IEM_EXPORT:
2959 env["EXPORT_DEVICE"] = real_disk.dev_path
2960 env["EXPORT_INDEX"] = str(disk_index)
2961 script = inst_os.export_script
2963 # TODO: Pass special environment only to script
2964 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2966 if mode == constants.IEM_IMPORT:
2967 suffix = "| %s" % script_cmd
2969 elif mode == constants.IEM_EXPORT:
2970 prefix = "%s |" % script_cmd
2972 # Let script predict size
2973 exp_size = constants.IE_CUSTOM_SIZE
2976 _Fail("Invalid %s I/O mode %r", mode, ieio)
2978 return (env, prefix, suffix, exp_size)
2981 def _CreateImportExportStatusDir(prefix):
2982 """Creates status directory for import/export.
2985 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2987 (prefix, utils.TimestampForFilename())))
2990 def StartImportExportDaemon(mode, opts, host, port, instance, component,
2992 """Starts an import or export daemon.
2994 @param mode: Import/output mode
2995 @type opts: L{objects.ImportExportOptions}
2996 @param opts: Daemon options
2998 @param host: Remote host for export (None for import)
3000 @param port: Remote port for export (None for import)
3001 @type instance: L{objects.Instance}
3002 @param instance: Instance object
3003 @type component: string
3004 @param component: which part of the instance is transferred now,
3006 @param ieio: Input/output type
3007 @param ieioargs: Input/output arguments
3010 if mode == constants.IEM_IMPORT:
3013 if not (host is None and port is None):
3014 _Fail("Can not specify host or port on import")
3016 elif mode == constants.IEM_EXPORT:
3019 if host is None or port is None:
3020 _Fail("Host and port must be specified for an export")
3023 _Fail("Invalid mode %r", mode)
3025 if (opts.key_name is None) ^ (opts.ca_pem is None):
3026 _Fail("Cluster certificate can only be used for both key and CA")
3028 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3029 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3031 if opts.key_name is None:
3033 key_path = constants.NODED_CERT_FILE
3034 cert_path = constants.NODED_CERT_FILE
3035 assert opts.ca_pem is None
3037 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3039 assert opts.ca_pem is not None
3041 for i in [key_path, cert_path]:
3042 if not os.path.exists(i):
3043 _Fail("File '%s' does not exist" % i)
3045 status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3047 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3048 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3049 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3051 if opts.ca_pem is None:
3053 ca = utils.ReadFile(constants.NODED_CERT_FILE)
3058 utils.WriteFile(ca_file, data=ca, mode=0400)
3061 constants.IMPORT_EXPORT_DAEMON,
3063 "--key=%s" % key_path,
3064 "--cert=%s" % cert_path,
3065 "--ca=%s" % ca_file,
3069 cmd.append("--host=%s" % host)
3072 cmd.append("--port=%s" % port)
3075 cmd.append("--ipv6")
3077 cmd.append("--ipv4")
3080 cmd.append("--compress=%s" % opts.compress)
3083 cmd.append("--magic=%s" % opts.magic)
3085 if exp_size is not None:
3086 cmd.append("--expected-size=%s" % exp_size)
3089 cmd.append("--cmd-prefix=%s" % cmd_prefix)
3092 cmd.append("--cmd-suffix=%s" % cmd_suffix)
3094 if mode == constants.IEM_EXPORT:
3095 # Retry connection a few times when connecting to remote peer
3096 cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3097 cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3098 elif opts.connect_timeout is not None:
3099 assert mode == constants.IEM_IMPORT
3100 # Overall timeout for establishing connection while listening
3101 cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3103 logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3105 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3106 # support for receiving a file descriptor for output
3107 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3110 # The import/export name is simply the status directory name
3111 return os.path.basename(status_dir)
3114 shutil.rmtree(status_dir, ignore_errors=True)
3118 def GetImportExportStatus(names):
3119 """Returns import/export daemon status.
3121 @type names: sequence
3122 @param names: List of names
3123 @rtype: List of dicts
3124 @return: Returns a list of the state of each named import/export or None if a
3125 status couldn't be read
3131 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3135 data = utils.ReadFile(status_file)
3136 except EnvironmentError, err:
3137 if err.errno != errno.ENOENT:
3145 result.append(serializer.LoadJson(data))
3150 def AbortImportExport(name):
3151 """Sends SIGTERM to a running import/export daemon.
3154 logging.info("Abort import/export %s", name)
3156 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3157 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3160 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3162 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3165 def CleanupImportExport(name):
3166 """Cleanup after an import or export.
3168 If the import/export daemon is still running it's killed. Afterwards the
3169 whole status directory is removed.
3172 logging.info("Finalizing import/export %s", name)
3174 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3176 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3179 logging.info("Import/export %s is still running with PID %s",
3181 utils.KillProcess(pid, waitpid=False)
3183 shutil.rmtree(status_dir, ignore_errors=True)
3186 def _FindDisks(nodes_ip, disks):
3187 """Sets the physical ID on disks and returns the block devices.
3190 # set the correct physical ID
3191 my_name = netutils.Hostname.GetSysName()
3193 cf.SetPhysicalID(my_name, nodes_ip)
3198 rd = _RecursiveFindBD(cf)
3200 _Fail("Can't find device %s", cf)
3205 def DrbdDisconnectNet(nodes_ip, disks):
3206 """Disconnects the network on a list of drbd devices.
3209 bdevs = _FindDisks(nodes_ip, disks)
3215 except errors.BlockDeviceError, err:
3216 _Fail("Can't change network configuration to standalone mode: %s",
3220 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3221 """Attaches the network on a list of drbd devices.
3224 bdevs = _FindDisks(nodes_ip, disks)
3227 for idx, rd in enumerate(bdevs):
3229 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3230 except EnvironmentError, err:
3231 _Fail("Can't create symlink: %s", err)
3232 # reconnect disks, switch to new master configuration and if
3233 # needed primary mode
3236 rd.AttachNet(multimaster)
3237 except errors.BlockDeviceError, err:
3238 _Fail("Can't change network configuration: %s", err)
3240 # wait until the disks are connected; we need to retry the re-attach
3241 # if the device becomes standalone, as this might happen if the one
3242 # node disconnects and reconnects in a different mode before the
3243 # other node reconnects; in this case, one or both of the nodes will
3244 # decide it has wrong configuration and switch to standalone
3247 all_connected = True
3250 stats = rd.GetProcStatus()
3252 all_connected = (all_connected and
3253 (stats.is_connected or stats.is_in_resync))
3255 if stats.is_standalone:
3256 # peer had different config info and this node became
3257 # standalone, even though this should not happen with the
3258 # new staged way of changing disk configs
3260 rd.AttachNet(multimaster)
3261 except errors.BlockDeviceError, err:
3262 _Fail("Can't change network configuration: %s", err)
3264 if not all_connected:
3265 raise utils.RetryAgain()
3268 # Start with a delay of 100 miliseconds and go up to 5 seconds
3269 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3270 except utils.RetryTimeout:
3271 _Fail("Timeout in disk reconnecting")
3274 # change to primary mode
3278 except errors.BlockDeviceError, err:
3279 _Fail("Can't change to primary mode: %s", err)
3282 def DrbdWaitSync(nodes_ip, disks):
3283 """Wait until DRBDs have synchronized.
3287 stats = rd.GetProcStatus()
3288 if not (stats.is_connected or stats.is_in_resync):
3289 raise utils.RetryAgain()
3292 bdevs = _FindDisks(nodes_ip, disks)
3298 # poll each second for 15 seconds
3299 stats = utils.Retry(_helper, 1, 15, args=[rd])
3300 except utils.RetryTimeout:
3301 stats = rd.GetProcStatus()
3303 if not (stats.is_connected or stats.is_in_resync):
3304 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3305 alldone = alldone and (not stats.is_in_resync)
3306 if stats.sync_percent is not None:
3307 min_resync = min(min_resync, stats.sync_percent)
3309 return (alldone, min_resync)
3312 def GetDrbdUsermodeHelper():
3313 """Returns DRBD usermode helper currently configured.
3317 return bdev.BaseDRBD.GetUsermodeHelper()
3318 except errors.BlockDeviceError, err:
3322 def PowercycleNode(hypervisor_type):
3323 """Hard-powercycle the node.
3325 Because we need to return first, and schedule the powercycle in the
3326 background, we won't be able to report failures nicely.
3329 hyper = hypervisor.GetHypervisor(hypervisor_type)
3333 # if we can't fork, we'll pretend that we're in the child process
3336 return "Reboot scheduled in 5 seconds"
3337 # ensure the child is running on ram
3340 except Exception: # pylint: disable=W0703
3343 hyper.PowercycleNode()
3346 class HooksRunner(object):
3349 This class is instantiated on the node side (ganeti-noded) and not
3353 def __init__(self, hooks_base_dir=None):
3354 """Constructor for hooks runner.
3356 @type hooks_base_dir: str or None
3357 @param hooks_base_dir: if not None, this overrides the
3358 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3361 if hooks_base_dir is None:
3362 hooks_base_dir = constants.HOOKS_BASE_DIR
3363 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3365 self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3367 def RunHooks(self, hpath, phase, env):
3368 """Run the scripts in the hooks directory.
3371 @param hpath: the path to the hooks directory which
3374 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3375 L{constants.HOOKS_PHASE_POST}
3377 @param env: dictionary with the environment for the hook
3379 @return: list of 3-element tuples:
3381 - script result, either L{constants.HKR_SUCCESS} or
3382 L{constants.HKR_FAIL}
3383 - output of the script
3385 @raise errors.ProgrammerError: for invalid input
3389 if phase == constants.HOOKS_PHASE_PRE:
3391 elif phase == constants.HOOKS_PHASE_POST:
3394 _Fail("Unknown hooks phase '%s'", phase)
3396 subdir = "%s-%s.d" % (hpath, suffix)
3397 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3401 if not os.path.isdir(dir_name):
3402 # for non-existing/non-dirs, we simply exit instead of logging a
3403 # warning at every operation
3406 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3408 for (relname, relstatus, runresult) in runparts_results:
3409 if relstatus == constants.RUNPARTS_SKIP:
3410 rrval = constants.HKR_SKIP
3412 elif relstatus == constants.RUNPARTS_ERR:
3413 rrval = constants.HKR_FAIL
3414 output = "Hook script execution error: %s" % runresult
3415 elif relstatus == constants.RUNPARTS_RUN:
3416 if runresult.failed:
3417 rrval = constants.HKR_FAIL
3419 rrval = constants.HKR_SUCCESS
3420 output = utils.SafeEncode(runresult.output.strip())
3421 results.append(("%s/%s" % (subdir, relname), rrval, output))
3426 class IAllocatorRunner(object):
3427 """IAllocator runner.
3429 This class is instantiated on the node side (ganeti-noded) and not on
3434 def Run(name, idata):
3435 """Run an iallocator script.
3438 @param name: the iallocator script name
3440 @param idata: the allocator input data
3443 @return: two element tuple of:
3445 - either error message or stdout of allocator (for success)
3448 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3450 if alloc_script is None:
3451 _Fail("iallocator module '%s' not found in the search path", name)
3453 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3457 result = utils.RunCmd([alloc_script, fin_name])
3459 _Fail("iallocator module '%s' failed: %s, output '%s'",
3460 name, result.fail_reason, result.output)
3464 return result.stdout
3467 class DevCacheManager(object):
3468 """Simple class for managing a cache of block device information.
3471 _DEV_PREFIX = "/dev/"
3472 _ROOT_DIR = constants.BDEV_CACHE_DIR
3475 def _ConvertPath(cls, dev_path):
3476 """Converts a /dev/name path to the cache file name.
3478 This replaces slashes with underscores and strips the /dev
3479 prefix. It then returns the full path to the cache file.
3482 @param dev_path: the C{/dev/} path name
3484 @return: the converted path name
3487 if dev_path.startswith(cls._DEV_PREFIX):
3488 dev_path = dev_path[len(cls._DEV_PREFIX):]
3489 dev_path = dev_path.replace("/", "_")
3490 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3494 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3495 """Updates the cache information for a given device.
3498 @param dev_path: the pathname of the device
3500 @param owner: the owner (instance name) of the device
3501 @type on_primary: bool
3502 @param on_primary: whether this is the primary
3505 @param iv_name: the instance-visible name of the
3506 device, as in objects.Disk.iv_name
3511 if dev_path is None:
3512 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3514 fpath = cls._ConvertPath(dev_path)
3520 iv_name = "not_visible"
3521 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3523 utils.WriteFile(fpath, data=fdata)
3524 except EnvironmentError, err:
3525 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3528 def RemoveCache(cls, dev_path):
3529 """Remove data for a dev_path.
3531 This is just a wrapper over L{utils.io.RemoveFile} with a converted
3532 path name and logging.
3535 @param dev_path: the pathname of the device
3540 if dev_path is None:
3541 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3543 fpath = cls._ConvertPath(dev_path)
3545 utils.RemoveFile(fpath)
3546 except EnvironmentError, err:
3547 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)