4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti.storage import bdev
58 from ganeti.storage import drbd
59 from ganeti import objects
60 from ganeti import ssconf
61 from ganeti import serializer
62 from ganeti import netutils
63 from ganeti import runtime
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
68 from ganeti.storage.base import BlockDev
69 from ganeti.storage.drbd import DRBD8
70 from ganeti import hooksmaster
73 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
74 _ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
76 pathutils.JOB_QUEUE_ARCHIVE_DIR,
78 pathutils.CRYPTO_KEYS_DIR,
80 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
81 _X509_KEY_FILE = "key"
82 _X509_CERT_FILE = "cert"
83 _IES_STATUS_FILE = "status"
87 #: Valid LVS output line regex
88 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
90 # Actions for the master setup script
91 _MASTER_START = "start"
94 #: Maximum file permissions for restricted command directory and executables
95 _RCMD_MAX_MODE = (stat.S_IRWXU |
96 stat.S_IRGRP | stat.S_IXGRP |
97 stat.S_IROTH | stat.S_IXOTH)
99 #: Delay before returning an error for restricted commands
100 _RCMD_INVALID_DELAY = 10
102 #: How long to wait to acquire lock for restricted commands (shorter than
103 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
104 #: command requests arrive
105 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
108 class RPCFail(Exception):
109 """Class denoting RPC failure.
111 Its argument is the error message.
116 def _GetInstReasonFilename(instance_name):
117 """Path of the file containing the reason of the instance status change.
119 @type instance_name: string
120 @param instance_name: The name of the instance
122 @return: The path of the file
125 return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
128 def _StoreInstReasonTrail(instance_name, trail):
129 """Serialize a reason trail related to an instance change of state to file.
131 The exact location of the file depends on the name of the instance and on
132 the configuration of the Ganeti cluster defined at deploy time.
134 @type instance_name: string
135 @param instance_name: The name of the instance
139 json = serializer.DumpJson(trail)
140 filename = _GetInstReasonFilename(instance_name)
141 utils.WriteFile(filename, data=json)
144 def _Fail(msg, *args, **kwargs):
145 """Log an error and the raise an RPCFail exception.
147 This exception is then handled specially in the ganeti daemon and
148 turned into a 'failed' return type. As such, this function is a
149 useful shortcut for logging the error and returning it to the master
153 @param msg: the text of the exception
159 if "log" not in kwargs or kwargs["log"]: # if we should log this error
160 if "exc" in kwargs and kwargs["exc"]:
161 logging.exception(msg)
168 """Simple wrapper to return a SimpleStore.
170 @rtype: L{ssconf.SimpleStore}
171 @return: a SimpleStore instance
174 return ssconf.SimpleStore()
177 def _GetSshRunner(cluster_name):
178 """Simple wrapper to return an SshRunner.
180 @type cluster_name: str
181 @param cluster_name: the cluster name, which is needed
182 by the SshRunner constructor
183 @rtype: L{ssh.SshRunner}
184 @return: an SshRunner instance
187 return ssh.SshRunner(cluster_name)
190 def _Decompress(data):
191 """Unpacks data compressed by the RPC client.
193 @type data: list or tuple
194 @param data: Data sent by RPC client
196 @return: Decompressed data
199 assert isinstance(data, (list, tuple))
200 assert len(data) == 2
201 (encoding, content) = data
202 if encoding == constants.RPC_ENCODING_NONE:
204 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
205 return zlib.decompress(base64.b64decode(content))
207 raise AssertionError("Unknown data encoding")
210 def _CleanDirectory(path, exclude=None):
211 """Removes all regular files in a directory.
214 @param path: the directory to clean
216 @param exclude: list of files to be excluded, defaults
220 if path not in _ALLOWED_CLEAN_DIRS:
221 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
224 if not os.path.isdir(path):
229 # Normalize excluded paths
230 exclude = [os.path.normpath(i) for i in exclude]
232 for rel_name in utils.ListVisibleFiles(path):
233 full_name = utils.PathJoin(path, rel_name)
234 if full_name in exclude:
236 if os.path.isfile(full_name) and not os.path.islink(full_name):
237 utils.RemoveFile(full_name)
240 def _BuildUploadFileList():
241 """Build the list of allowed upload files.
243 This is abstracted so that it's built only once at module import time.
246 allowed_files = set([
247 pathutils.CLUSTER_CONF_FILE,
249 pathutils.SSH_KNOWN_HOSTS_FILE,
250 pathutils.VNC_PASSWORD_FILE,
251 pathutils.RAPI_CERT_FILE,
252 pathutils.SPICE_CERT_FILE,
253 pathutils.SPICE_CACERT_FILE,
254 pathutils.RAPI_USERS_FILE,
255 pathutils.CONFD_HMAC_KEY,
256 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
259 for hv_name in constants.HYPER_TYPES:
260 hv_class = hypervisor.GetHypervisorClass(hv_name)
261 allowed_files.update(hv_class.GetAncillaryFiles()[0])
263 assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
264 "Allowed file storage paths should never be uploaded via RPC"
266 return frozenset(allowed_files)
269 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273 """Removes job queue files and archived jobs.
279 _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
280 _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284 """Returns master information.
286 This is an utility function to compute master information, either
287 for consumption here or from the node daemon.
290 @return: master_netdev, master_ip, master_name, primary_ip_family,
292 @raise RPCFail: in case of errors
297 master_netdev = cfg.GetMasterNetdev()
298 master_ip = cfg.GetMasterIP()
299 master_netmask = cfg.GetMasterNetmask()
300 master_node = cfg.GetMasterNode()
301 primary_ip_family = cfg.GetPrimaryIPFamily()
302 except errors.ConfigurationError, err:
303 _Fail("Cluster configuration incomplete: %s", err, exc=True)
304 return (master_netdev, master_ip, master_node, primary_ip_family,
308 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
309 """Decorator that runs hooks before and after the decorated function.
311 @type hook_opcode: string
312 @param hook_opcode: opcode of the hook
313 @type hooks_path: string
314 @param hooks_path: path of the hooks
315 @type env_builder_fn: function
316 @param env_builder_fn: function that returns a dictionary containing the
317 environment variables for the hooks. Will get all the parameters of the
319 @raise RPCFail: in case of pre-hook failure
323 def wrapper(*args, **kwargs):
324 _, myself = ssconf.GetMasterAndMyself()
325 nodes = ([myself], [myself]) # these hooks run locally
327 env_fn = compat.partial(env_builder_fn, *args, **kwargs)
331 hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
332 hr.RunLocalHooks, None, env_fn,
333 logging.warning, cfg.GetClusterName(),
335 hm.RunPhase(constants.HOOKS_PHASE_PRE)
336 result = fn(*args, **kwargs)
337 hm.RunPhase(constants.HOOKS_PHASE_POST)
344 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
345 """Builds environment variables for master IP hooks.
347 @type master_params: L{objects.MasterNetworkParameters}
348 @param master_params: network parameters of the master
349 @type use_external_mip_script: boolean
350 @param use_external_mip_script: whether to use an external master IP
351 address setup script (unused, but necessary per the implementation of the
352 _RunLocalHooks decorator)
355 # pylint: disable=W0613
356 ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
358 "MASTER_NETDEV": master_params.netdev,
359 "MASTER_IP": master_params.ip,
360 "MASTER_NETMASK": str(master_params.netmask),
361 "CLUSTER_IP_VERSION": str(ver),
367 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
368 """Execute the master IP address setup script.
370 @type master_params: L{objects.MasterNetworkParameters}
371 @param master_params: network parameters of the master
373 @param action: action to pass to the script. Must be one of
374 L{backend._MASTER_START} or L{backend._MASTER_STOP}
375 @type use_external_mip_script: boolean
376 @param use_external_mip_script: whether to use an external master IP
378 @raise backend.RPCFail: if there are errors during the execution of the
382 env = _BuildMasterIpEnv(master_params)
384 if use_external_mip_script:
385 setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
387 setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
389 result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
392 _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
393 (action, result.exit_code, result.output), log=True)
396 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
398 def ActivateMasterIp(master_params, use_external_mip_script):
399 """Activate the IP address of the master daemon.
401 @type master_params: L{objects.MasterNetworkParameters}
402 @param master_params: network parameters of the master
403 @type use_external_mip_script: boolean
404 @param use_external_mip_script: whether to use an external master IP
406 @raise RPCFail: in case of errors during the IP startup
409 _RunMasterSetupScript(master_params, _MASTER_START,
410 use_external_mip_script)
413 def StartMasterDaemons(no_voting):
414 """Activate local node as master node.
416 The function will start the master daemons (ganeti-masterd and ganeti-rapi).
418 @type no_voting: boolean
419 @param no_voting: whether to start ganeti-masterd without a node vote
420 but still non-interactively
426 masterd_args = "--no-voting --yes-do-it"
431 "EXTRA_MASTERD_ARGS": masterd_args,
434 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
436 msg = "Can't start Ganeti master: %s" % result.output
441 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
443 def DeactivateMasterIp(master_params, use_external_mip_script):
444 """Deactivate the master IP on this node.
446 @type master_params: L{objects.MasterNetworkParameters}
447 @param master_params: network parameters of the master
448 @type use_external_mip_script: boolean
449 @param use_external_mip_script: whether to use an external master IP
451 @raise RPCFail: in case of errors during the IP turndown
454 _RunMasterSetupScript(master_params, _MASTER_STOP,
455 use_external_mip_script)
458 def StopMasterDaemons():
459 """Stop the master daemons on this node.
461 Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
466 # TODO: log and report back to the caller the error failures; we
467 # need to decide in which case we fail the RPC for this
469 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
471 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
473 result.cmd, result.exit_code, result.output)
476 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
477 """Change the netmask of the master IP.
479 @param old_netmask: the old value of the netmask
480 @param netmask: the new value of the netmask
481 @param master_ip: the master IP
482 @param master_netdev: the master network device
485 if old_netmask == netmask:
488 if not netutils.IPAddress.Own(master_ip):
489 _Fail("The master IP address is not up, not attempting to change its"
492 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
493 "%s/%s" % (master_ip, netmask),
494 "dev", master_netdev, "label",
495 "%s:0" % master_netdev])
497 _Fail("Could not set the new netmask on the master IP address")
499 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
500 "%s/%s" % (master_ip, old_netmask),
501 "dev", master_netdev, "label",
502 "%s:0" % master_netdev])
504 _Fail("Could not bring down the master IP address with the old netmask")
507 def EtcHostsModify(mode, host, ip):
508 """Modify a host entry in /etc/hosts.
510 @param mode: The mode to operate. Either add or remove entry
511 @param host: The host to operate on
512 @param ip: The ip associated with the entry
515 if mode == constants.ETC_HOSTS_ADD:
517 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
519 utils.AddHostToEtcHosts(host, ip)
520 elif mode == constants.ETC_HOSTS_REMOVE:
522 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
523 " parameter is present")
524 utils.RemoveHostFromEtcHosts(host)
526 RPCFail("Mode not supported")
529 def LeaveCluster(modify_ssh_setup):
530 """Cleans up and remove the current node.
532 This function cleans up and prepares the current node to be removed
535 If processing is successful, then it raises an
536 L{errors.QuitGanetiException} which is used as a special case to
537 shutdown the node daemon.
539 @param modify_ssh_setup: boolean
542 _CleanDirectory(pathutils.DATA_DIR)
543 _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
548 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
550 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
552 utils.RemoveFile(priv_key)
553 utils.RemoveFile(pub_key)
554 except errors.OpExecError:
555 logging.exception("Error while processing ssh files")
558 utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
559 utils.RemoveFile(pathutils.RAPI_CERT_FILE)
560 utils.RemoveFile(pathutils.SPICE_CERT_FILE)
561 utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
562 utils.RemoveFile(pathutils.NODED_CERT_FILE)
563 except: # pylint: disable=W0702
564 logging.exception("Error while removing cluster secrets")
566 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
568 logging.error("Command %s failed with exitcode %s and error %s",
569 result.cmd, result.exit_code, result.output)
571 # Raise a custom exception (handled in ganeti-noded)
572 raise errors.QuitGanetiException(True, "Shutdown scheduled")
575 def _GetVgInfo(name, excl_stor):
576 """Retrieves information about a LVM volume group.
579 # TODO: GetVGInfo supports returning information for multiple VGs at once
580 vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
582 vg_free = int(round(vginfo[0][0], 0))
583 vg_size = int(round(vginfo[0][1], 0))
595 def _GetVgSpindlesInfo(name, excl_stor):
596 """Retrieves information about spindles in an LVM volume group.
600 @type excl_stor: bool
601 @param excl_stor: exclusive storage
603 @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
604 free spindles, total spindles respectively
608 (vg_free, vg_size) = bdev.LogicalVolume.GetVgSpindlesInfo(name)
619 def _GetHvInfo(name):
620 """Retrieves node information from a hypervisor.
622 The information returned depends on the hypervisor. Common items:
624 - vg_size is the size of the configured volume group in MiB
625 - vg_free is the free size of the volume group in MiB
626 - memory_dom0 is the memory allocated for domain0 in MiB
627 - memory_free is the currently available (free) ram in MiB
628 - memory_total is the total number of ram in MiB
629 - hv_version: the hypervisor version, if available
632 return hypervisor.GetHypervisor(name).GetNodeInfo()
635 def _GetNamedNodeInfo(names, fn):
636 """Calls C{fn} for all names in C{names} and returns a dictionary.
644 return map(fn, names)
647 def GetNodeInfo(storage_units, hv_names, excl_stor):
648 """Gives back a hash with different information about the node.
650 @type storage_units: list of pairs (string, string)
651 @param storage_units: List of pairs (storage unit, identifier) to ask for disk
652 space information. In case of lvm-vg, the identifier is
654 @type hv_names: list of string
655 @param hv_names: Names of the hypervisors to ask for node information
656 @type excl_stor: boolean
657 @param excl_stor: Whether exclusive_storage is active
658 @rtype: tuple; (string, None/dict, None/dict)
659 @return: Tuple containing boot ID, volume group information and hypervisor
663 bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
664 storage_info = _GetNamedNodeInfo(
666 (lambda storage_unit: _ApplyStorageInfoFunction(storage_unit[0],
669 hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
671 return (bootid, storage_info, hv_info)
674 # FIXME: implement storage reporting for all missing storage types.
675 _STORAGE_TYPE_INFO_FN = {
676 constants.ST_BLOCK: None,
677 constants.ST_DISKLESS: None,
678 constants.ST_EXT: None,
679 constants.ST_FILE: None,
680 constants.ST_LVM_PV: _GetVgSpindlesInfo,
681 constants.ST_LVM_VG: _GetVgInfo,
682 constants.ST_RADOS: None,
686 def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
687 """Looks up and applies the correct function to calculate free and total
688 storage for the given storage type.
690 @type storage_type: string
691 @param storage_type: the storage type for which the storage shall be reported.
692 @type storage_key: string
693 @param storage_key: identifier of a storage unit, e.g. the volume group name
694 of an LVM storage unit
696 @param args: various parameters that can be used for storage reporting. These
697 parameters and their semantics vary from storage type to storage type and
698 are just propagated in this function.
699 @return: the results of the application of the storage space function (see
700 _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
702 @raises NotImplementedError: for storage types who don't support space
705 fn = _STORAGE_TYPE_INFO_FN[storage_type]
707 return fn(storage_key, *args)
709 raise NotImplementedError
712 def _CheckExclusivePvs(pvi_list):
713 """Check that PVs are not shared among LVs
715 @type pvi_list: list of L{objects.LvmPvInfo} objects
716 @param pvi_list: information about the PVs
718 @rtype: list of tuples (string, list of strings)
719 @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
724 if len(pvi.lv_list) > 1:
725 res.append((pvi.name, pvi.lv_list))
729 def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
730 get_hv_fn=hypervisor.GetHypervisor):
731 """Verifies the hypervisor. Appends the results to the 'results' list.
734 @param what: a dictionary of things to check
735 @type vm_capable: boolean
736 @param vm_capable: whether or not this node is vm capable
738 @param result: dictionary of verification results; results of the
739 verifications in this function will be added here
740 @type all_hvparams: dict of dict of string
741 @param all_hvparams: dictionary mapping hypervisor names to hvparams
742 @type get_hv_fn: function
743 @param get_hv_fn: function to retrieve the hypervisor, to improve testability
749 if constants.NV_HYPERVISOR in what:
750 result[constants.NV_HYPERVISOR] = {}
751 for hv_name in what[constants.NV_HYPERVISOR]:
752 hvparams = all_hvparams[hv_name]
754 val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
755 except errors.HypervisorError, err:
756 val = "Error while checking hypervisor: %s" % str(err)
757 result[constants.NV_HYPERVISOR][hv_name] = val
760 def _VerifyHvparams(what, vm_capable, result,
761 get_hv_fn=hypervisor.GetHypervisor):
762 """Verifies the hvparams. Appends the results to the 'results' list.
765 @param what: a dictionary of things to check
766 @type vm_capable: boolean
767 @param vm_capable: whether or not this node is vm capable
769 @param result: dictionary of verification results; results of the
770 verifications in this function will be added here
771 @type get_hv_fn: function
772 @param get_hv_fn: function to retrieve the hypervisor, to improve testability
778 if constants.NV_HVPARAMS in what:
779 result[constants.NV_HVPARAMS] = []
780 for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
782 logging.info("Validating hv %s, %s", hv_name, hvparms)
783 get_hv_fn(hv_name).ValidateParameters(hvparms)
784 except errors.HypervisorError, err:
785 result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
788 def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
789 """Verifies the instance list.
792 @param what: a dictionary of things to check
793 @type vm_capable: boolean
794 @param vm_capable: whether or not this node is vm capable
796 @param result: dictionary of verification results; results of the
797 verifications in this function will be added here
798 @type all_hvparams: dict of dict of string
799 @param all_hvparams: dictionary mapping hypervisor names to hvparams
802 if constants.NV_INSTANCELIST in what and vm_capable:
803 # GetInstanceList can fail
805 val = GetInstanceList(what[constants.NV_INSTANCELIST],
806 all_hvparams=all_hvparams)
809 result[constants.NV_INSTANCELIST] = val
812 def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
813 """Verifies the node info.
816 @param what: a dictionary of things to check
817 @type vm_capable: boolean
818 @param vm_capable: whether or not this node is vm capable
820 @param result: dictionary of verification results; results of the
821 verifications in this function will be added here
822 @type all_hvparams: dict of dict of string
823 @param all_hvparams: dictionary mapping hypervisor names to hvparams
826 if constants.NV_HVINFO in what and vm_capable:
827 hvname = what[constants.NV_HVINFO]
828 hyper = hypervisor.GetHypervisor(hvname)
829 hvparams = all_hvparams[hvname]
830 result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
833 def VerifyNode(what, cluster_name, all_hvparams):
834 """Verify the status of the local node.
836 Based on the input L{what} parameter, various checks are done on the
839 If the I{filelist} key is present, this list of
840 files is checksummed and the file/checksum pairs are returned.
842 If the I{nodelist} key is present, we check that we have
843 connectivity via ssh with the target nodes (and check the hostname
846 If the I{node-net-test} key is present, we check that we have
847 connectivity to the given nodes via both primary IP and, if
848 applicable, secondary IPs.
851 @param what: a dictionary of things to check:
852 - filelist: list of files for which to compute checksums
853 - nodelist: list of nodes we should check ssh communication with
854 - node-net-test: list of nodes we should check node daemon port
856 - hypervisor: list with hypervisors to run the verify for
857 @type cluster_name: string
858 @param cluster_name: the cluster's name
859 @type all_hvparams: dict of dict of strings
860 @param all_hvparams: a dictionary mapping hypervisor names to hvparams
862 @return: a dictionary with the same keys as the input dict, and
863 values representing the result of the checks
867 my_name = netutils.Hostname.GetSysName()
868 port = netutils.GetDaemonPort(constants.NODED)
869 vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
871 _VerifyHypervisors(what, vm_capable, result, all_hvparams)
872 _VerifyHvparams(what, vm_capable, result)
874 if constants.NV_FILELIST in what:
875 fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
876 what[constants.NV_FILELIST]))
877 result[constants.NV_FILELIST] = \
878 dict((vcluster.MakeVirtualPath(key), value)
879 for (key, value) in fingerprints.items())
881 if constants.NV_NODELIST in what:
882 (nodes, bynode) = what[constants.NV_NODELIST]
884 # Add nodes from other groups (different for each node)
886 nodes.extend(bynode[my_name])
891 random.shuffle(nodes)
893 # Try to contact all nodes
896 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
900 result[constants.NV_NODELIST] = val
902 if constants.NV_NODENETTEST in what:
903 result[constants.NV_NODENETTEST] = tmp = {}
904 my_pip = my_sip = None
905 for name, pip, sip in what[constants.NV_NODENETTEST]:
911 tmp[my_name] = ("Can't find my own primary/secondary IP"
914 for name, pip, sip in what[constants.NV_NODENETTEST]:
916 if not netutils.TcpPing(pip, port, source=my_pip):
917 fail.append("primary")
919 if not netutils.TcpPing(sip, port, source=my_sip):
920 fail.append("secondary")
922 tmp[name] = ("failure using the %s interface(s)" %
925 if constants.NV_MASTERIP in what:
926 # FIXME: add checks on incoming data structures (here and in the
927 # rest of the function)
928 master_name, master_ip = what[constants.NV_MASTERIP]
929 if master_name == my_name:
930 source = constants.IP4_ADDRESS_LOCALHOST
933 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
936 if constants.NV_USERSCRIPTS in what:
937 result[constants.NV_USERSCRIPTS] = \
938 [script for script in what[constants.NV_USERSCRIPTS]
939 if not utils.IsExecutable(script)]
941 if constants.NV_OOB_PATHS in what:
942 result[constants.NV_OOB_PATHS] = tmp = []
943 for path in what[constants.NV_OOB_PATHS]:
947 tmp.append("error stating out of band helper: %s" % err)
949 if stat.S_ISREG(st.st_mode):
950 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
953 tmp.append("out of band helper %s is not executable" % path)
955 tmp.append("out of band helper %s is not a file" % path)
957 if constants.NV_LVLIST in what and vm_capable:
959 val = GetVolumeList(utils.ListVolumeGroups().keys())
962 result[constants.NV_LVLIST] = val
964 _VerifyInstanceList(what, vm_capable, result, all_hvparams)
966 if constants.NV_VGLIST in what and vm_capable:
967 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
969 if constants.NV_PVLIST in what and vm_capable:
970 check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
971 val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
972 filter_allocatable=False,
973 include_lvs=check_exclusive_pvs)
974 if check_exclusive_pvs:
975 result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
977 # Avoid sending useless data on the wire
979 result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
981 if constants.NV_VERSION in what:
982 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
983 constants.RELEASE_VERSION)
985 _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
987 if constants.NV_DRBDVERSION in what and vm_capable:
989 drbd_version = DRBD8.GetProcInfo().GetVersionString()
990 except errors.BlockDeviceError, err:
991 logging.warning("Can't get DRBD version", exc_info=True)
992 drbd_version = str(err)
993 result[constants.NV_DRBDVERSION] = drbd_version
995 if constants.NV_DRBDLIST in what and vm_capable:
997 used_minors = drbd.DRBD8.GetUsedDevs()
998 except errors.BlockDeviceError, err:
999 logging.warning("Can't get used minors list", exc_info=True)
1000 used_minors = str(err)
1001 result[constants.NV_DRBDLIST] = used_minors
1003 if constants.NV_DRBDHELPER in what and vm_capable:
1006 payload = drbd.DRBD8.GetUsermodeHelper()
1007 except errors.BlockDeviceError, err:
1008 logging.error("Can't get DRBD usermode helper: %s", str(err))
1011 result[constants.NV_DRBDHELPER] = (status, payload)
1013 if constants.NV_NODESETUP in what:
1014 result[constants.NV_NODESETUP] = tmpr = []
1015 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1016 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1017 " under /sys, missing required directories /sys/block"
1018 " and /sys/class/net")
1019 if (not os.path.isdir("/proc/sys") or
1020 not os.path.isfile("/proc/sysrq-trigger")):
1021 tmpr.append("The procfs filesystem doesn't seem to be mounted"
1022 " under /proc, missing required directory /proc/sys and"
1023 " the file /proc/sysrq-trigger")
1025 if constants.NV_TIME in what:
1026 result[constants.NV_TIME] = utils.SplitTime(time.time())
1028 if constants.NV_OSLIST in what and vm_capable:
1029 result[constants.NV_OSLIST] = DiagnoseOS()
1031 if constants.NV_BRIDGES in what and vm_capable:
1032 result[constants.NV_BRIDGES] = [bridge
1033 for bridge in what[constants.NV_BRIDGES]
1034 if not utils.BridgeExists(bridge)]
1036 if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
1037 result[constants.NV_FILE_STORAGE_PATHS] = \
1038 bdev.ComputeWrongFileStoragePaths()
1043 def GetBlockDevSizes(devices):
1044 """Return the size of the given block devices
1047 @param devices: list of block device nodes to query
1050 dictionary of all block devices under /dev (key). The value is their
1053 {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1056 DEV_PREFIX = "/dev/"
1059 for devpath in devices:
1060 if not utils.IsBelowDir(DEV_PREFIX, devpath):
1064 st = os.stat(devpath)
1065 except EnvironmentError, err:
1066 logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1069 if stat.S_ISBLK(st.st_mode):
1070 result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1072 # We don't want to fail, just do not list this device as available
1073 logging.warning("Cannot get size for block device %s", devpath)
1076 size = int(result.stdout) / (1024 * 1024)
1077 blockdevs[devpath] = size
1081 def GetVolumeList(vg_names):
1082 """Compute list of logical volumes and their size.
1084 @type vg_names: list
1085 @param vg_names: the volume groups whose LVs we should list, or
1086 empty for all volume groups
1089 dictionary of all partions (key) with value being a tuple of
1090 their size (in MiB), inactive and online status::
1092 {'xenvg/test1': ('20.06', True, True)}
1094 in case of errors, a string is returned with the error
1102 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1103 "--separator=%s" % sep,
1104 "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1106 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1108 for line in result.stdout.splitlines():
1110 match = _LVSLINE_REGEX.match(line)
1112 logging.error("Invalid line returned from lvs output: '%s'", line)
1114 vg_name, name, size, attr = match.groups()
1115 inactive = attr[4] == "-"
1116 online = attr[5] == "o"
1117 virtual = attr[0] == "v"
1119 # we don't want to report such volumes as existing, since they
1120 # don't really hold data
1122 lvs[vg_name + "/" + name] = (size, inactive, online)
1127 def ListVolumeGroups():
1128 """List the volume groups and their size.
1131 @return: dictionary with keys volume name and values the
1135 return utils.ListVolumeGroups()
1139 """List all volumes on this node.
1143 A list of dictionaries, each having four keys:
1144 - name: the logical volume name,
1145 - size: the size of the logical volume
1146 - dev: the physical device on which the LV lives
1147 - vg: the volume group to which it belongs
1149 In case of errors, we return an empty list and log the
1152 Note that since a logical volume can live on multiple physical
1153 volumes, the resulting list might include a logical volume
1157 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1159 "--options=lv_name,lv_size,devices,vg_name"])
1161 _Fail("Failed to list logical volumes, lvs output: %s",
1165 return dev.split("(")[0]
1167 def handle_dev(dev):
1168 return [parse_dev(x) for x in dev.split(",")]
1171 line = [v.strip() for v in line]
1172 return [{"name": line[0], "size": line[1],
1173 "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1176 for line in result.stdout.splitlines():
1177 if line.count("|") >= 3:
1178 all_devs.extend(map_line(line.split("|")))
1180 logging.warning("Strange line in the output from lvs: '%s'", line)
1184 def BridgesExist(bridges_list):
1185 """Check if a list of bridges exist on the current node.
1188 @return: C{True} if all of them exist, C{False} otherwise
1192 for bridge in bridges_list:
1193 if not utils.BridgeExists(bridge):
1194 missing.append(bridge)
1197 _Fail("Missing bridges %s", utils.CommaJoin(missing))
1200 def GetInstanceListForHypervisor(hname, hvparams=None,
1201 get_hv_fn=hypervisor.GetHypervisor):
1202 """Provides a list of instances of the given hypervisor.
1205 @param hname: name of the hypervisor
1206 @type hvparams: dict of strings
1207 @param hvparams: hypervisor parameters for the given hypervisor
1208 @type get_hv_fn: function
1209 @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1210 name; optional parameter to increase testability
1213 @return: a list of all running instances on the current node
1214 - instance1.example.com
1215 - instance2.example.com
1220 hv = get_hv_fn(hname)
1221 names = hv.ListInstances(hvparams=hvparams)
1222 results.extend(names)
1223 except errors.HypervisorError, err:
1224 _Fail("Error enumerating instances (hypervisor %s): %s",
1225 hname, err, exc=True)
1229 def GetInstanceList(hypervisor_list, all_hvparams=None,
1230 get_hv_fn=hypervisor.GetHypervisor):
1231 """Provides a list of instances.
1233 @type hypervisor_list: list
1234 @param hypervisor_list: the list of hypervisors to query information
1235 @type all_hvparams: dict of dict of strings
1236 @param all_hvparams: a dictionary mapping hypervisor types to respective
1237 cluster-wide hypervisor parameters
1238 @type get_hv_fn: function
1239 @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1240 name; optional parameter to increase testability
1243 @return: a list of all running instances on the current node
1244 - instance1.example.com
1245 - instance2.example.com
1249 for hname in hypervisor_list:
1250 hvparams = all_hvparams[hname]
1251 results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1252 get_hv_fn=get_hv_fn))
1256 def GetInstanceInfo(instance, hname):
1257 """Gives back the information about an instance as a dictionary.
1259 @type instance: string
1260 @param instance: the instance name
1262 @param hname: the hypervisor type of the instance
1265 @return: dictionary with the following keys:
1266 - memory: memory size of instance (int)
1267 - state: xen state of instance (string)
1268 - time: cpu time of instance (float)
1269 - vcpus: the number of vcpus (int)
1274 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1275 if iinfo is not None:
1276 output["memory"] = iinfo[2]
1277 output["vcpus"] = iinfo[3]
1278 output["state"] = iinfo[4]
1279 output["time"] = iinfo[5]
1284 def GetInstanceMigratable(instance):
1285 """Computes whether an instance can be migrated.
1287 @type instance: L{objects.Instance}
1288 @param instance: object representing the instance to be checked.
1291 @return: tuple of (result, description) where:
1292 - result: whether the instance can be migrated or not
1293 - description: a description of the issue, if relevant
1296 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1297 iname = instance.name
1298 if iname not in hyper.ListInstances(instance.hvparams):
1299 _Fail("Instance %s is not running", iname)
1301 for idx in range(len(instance.disks)):
1302 link_name = _GetBlockDevSymlinkPath(iname, idx)
1303 if not os.path.islink(link_name):
1304 logging.warning("Instance %s is missing symlink %s for disk %d",
1305 iname, link_name, idx)
1308 def GetAllInstancesInfo(hypervisor_list):
1309 """Gather data about all instances.
1311 This is the equivalent of L{GetInstanceInfo}, except that it
1312 computes data for all instances at once, thus being faster if one
1313 needs data about more than one instance.
1315 @type hypervisor_list: list
1316 @param hypervisor_list: list of hypervisors to query for instance data
1319 @return: dictionary of instance: data, with data having the following keys:
1320 - memory: memory size of instance (int)
1321 - state: xen state of instance (string)
1322 - time: cpu time of instance (float)
1323 - vcpus: the number of vcpus
1328 for hname in hypervisor_list:
1329 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1331 for name, _, memory, vcpus, state, times in iinfo:
1339 # we only check static parameters, like memory and vcpus,
1340 # and not state and time which can change between the
1341 # invocations of the different hypervisors
1342 for key in "memory", "vcpus":
1343 if value[key] != output[name][key]:
1344 _Fail("Instance %s is running twice"
1345 " with different parameters", name)
1346 output[name] = value
1351 def _InstanceLogName(kind, os_name, instance, component):
1352 """Compute the OS log filename for a given instance and operation.
1354 The instance name and os name are passed in as strings since not all
1355 operations have these as part of an instance object.
1358 @param kind: the operation type (e.g. add, import, etc.)
1359 @type os_name: string
1360 @param os_name: the os name
1361 @type instance: string
1362 @param instance: the name of the instance being imported/added/etc.
1363 @type component: string or None
1364 @param component: the name of the component of the instance being
1368 # TODO: Use tempfile.mkstemp to create unique filename
1370 assert "/" not in component
1371 c_msg = "-%s" % component
1374 base = ("%s-%s-%s%s-%s.log" %
1375 (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1376 return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1379 def InstanceOsAdd(instance, reinstall, debug):
1380 """Add an OS to an instance.
1382 @type instance: L{objects.Instance}
1383 @param instance: Instance whose OS is to be installed
1384 @type reinstall: boolean
1385 @param reinstall: whether this is an instance reinstall
1386 @type debug: integer
1387 @param debug: debug level, passed to the OS scripts
1391 inst_os = OSFromDisk(instance.os)
1393 create_env = OSEnvironment(instance, inst_os, debug)
1395 create_env["INSTANCE_REINSTALL"] = "1"
1397 logfile = _InstanceLogName("add", instance.os, instance.name, None)
1399 result = utils.RunCmd([inst_os.create_script], env=create_env,
1400 cwd=inst_os.path, output=logfile, reset_env=True)
1402 logging.error("os create command '%s' returned error: %s, logfile: %s,"
1403 " output: %s", result.cmd, result.fail_reason, logfile,
1405 lines = [utils.SafeEncode(val)
1406 for val in utils.TailFile(logfile, lines=20)]
1407 _Fail("OS create script failed (%s), last lines in the"
1408 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1411 def RunRenameInstance(instance, old_name, debug):
1412 """Run the OS rename script for an instance.
1414 @type instance: L{objects.Instance}
1415 @param instance: Instance whose OS is to be installed
1416 @type old_name: string
1417 @param old_name: previous instance name
1418 @type debug: integer
1419 @param debug: debug level, passed to the OS scripts
1421 @return: the success of the operation
1424 inst_os = OSFromDisk(instance.os)
1426 rename_env = OSEnvironment(instance, inst_os, debug)
1427 rename_env["OLD_INSTANCE_NAME"] = old_name
1429 logfile = _InstanceLogName("rename", instance.os,
1430 "%s-%s" % (old_name, instance.name), None)
1432 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1433 cwd=inst_os.path, output=logfile, reset_env=True)
1436 logging.error("os create command '%s' returned error: %s output: %s",
1437 result.cmd, result.fail_reason, result.output)
1438 lines = [utils.SafeEncode(val)
1439 for val in utils.TailFile(logfile, lines=20)]
1440 _Fail("OS rename script failed (%s), last lines in the"
1441 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1444 def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1445 """Returns symlink path for block device.
1449 _dir = pathutils.DISK_LINKS_DIR
1451 return utils.PathJoin(_dir,
1453 (instance_name, constants.DISK_SEPARATOR, idx)))
1456 def _SymlinkBlockDev(instance_name, device_path, idx):
1457 """Set up symlinks to a instance's block device.
1459 This is an auxiliary function run when an instance is start (on the primary
1460 node) or when an instance is migrated (on the target node).
1463 @param instance_name: the name of the target instance
1464 @param device_path: path of the physical block device, on the node
1465 @param idx: the disk index
1466 @return: absolute path to the disk's symlink
1469 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1471 os.symlink(device_path, link_name)
1472 except OSError, err:
1473 if err.errno == errno.EEXIST:
1474 if (not os.path.islink(link_name) or
1475 os.readlink(link_name) != device_path):
1476 os.remove(link_name)
1477 os.symlink(device_path, link_name)
1484 def _RemoveBlockDevLinks(instance_name, disks):
1485 """Remove the block device symlinks belonging to the given instance.
1488 for idx, _ in enumerate(disks):
1489 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1490 if os.path.islink(link_name):
1492 os.remove(link_name)
1494 logging.exception("Can't remove symlink '%s'", link_name)
1497 def _GatherAndLinkBlockDevs(instance):
1498 """Set up an instance's block device(s).
1500 This is run on the primary node at instance startup. The block
1501 devices must be already assembled.
1503 @type instance: L{objects.Instance}
1504 @param instance: the instance whose disks we shoul assemble
1506 @return: list of (disk_object, device_path)
1510 for idx, disk in enumerate(instance.disks):
1511 device = _RecursiveFindBD(disk)
1513 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1517 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1519 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1522 block_devices.append((disk, link_name))
1524 return block_devices
1527 def StartInstance(instance, startup_paused, reason, store_reason=True):
1528 """Start an instance.
1530 @type instance: L{objects.Instance}
1531 @param instance: the instance object
1532 @type startup_paused: bool
1533 @param instance: pause instance at startup?
1534 @type reason: list of reasons
1535 @param reason: the reason trail for this startup
1536 @type store_reason: boolean
1537 @param store_reason: whether to store the shutdown reason trail on file
1541 running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1544 if instance.name in running_instances:
1545 logging.info("Instance %s already running, not starting", instance.name)
1549 block_devices = _GatherAndLinkBlockDevs(instance)
1550 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1551 hyper.StartInstance(instance, block_devices, startup_paused)
1553 _StoreInstReasonTrail(instance.name, reason)
1554 except errors.BlockDeviceError, err:
1555 _Fail("Block device error: %s", err, exc=True)
1556 except errors.HypervisorError, err:
1557 _RemoveBlockDevLinks(instance.name, instance.disks)
1558 _Fail("Hypervisor error: %s", err, exc=True)
1561 def InstanceShutdown(instance, timeout, reason, store_reason=True):
1562 """Shut an instance down.
1564 @note: this functions uses polling with a hardcoded timeout.
1566 @type instance: L{objects.Instance}
1567 @param instance: the instance object
1568 @type timeout: integer
1569 @param timeout: maximum timeout for soft shutdown
1570 @type reason: list of reasons
1571 @param reason: the reason trail for this shutdown
1572 @type store_reason: boolean
1573 @param store_reason: whether to store the shutdown reason trail on file
1577 hv_name = instance.hypervisor
1578 hyper = hypervisor.GetHypervisor(hv_name)
1579 iname = instance.name
1581 if instance.name not in hyper.ListInstances(instance.hvparams):
1582 logging.info("Instance %s not running, doing nothing", iname)
1587 self.tried_once = False
1590 if iname not in hyper.ListInstances(instance.hvparams):
1594 hyper.StopInstance(instance, retry=self.tried_once)
1596 _StoreInstReasonTrail(instance.name, reason)
1597 except errors.HypervisorError, err:
1598 if iname not in hyper.ListInstances(instance.hvparams):
1599 # if the instance is no longer existing, consider this a
1600 # success and go to cleanup
1603 _Fail("Failed to stop instance %s: %s", iname, err)
1605 self.tried_once = True
1607 raise utils.RetryAgain()
1610 utils.Retry(_TryShutdown(), 5, timeout)
1611 except utils.RetryTimeout:
1612 # the shutdown did not succeed
1613 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1616 hyper.StopInstance(instance, force=True)
1617 except errors.HypervisorError, err:
1618 if iname in hyper.ListInstances(instance.hvparams):
1619 # only raise an error if the instance still exists, otherwise
1620 # the error could simply be "instance ... unknown"!
1621 _Fail("Failed to force stop instance %s: %s", iname, err)
1625 if iname in hyper.ListInstances(instance.hvparams):
1626 _Fail("Could not shutdown instance %s even by destroy", iname)
1629 hyper.CleanupInstance(instance.name)
1630 except errors.HypervisorError, err:
1631 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1633 _RemoveBlockDevLinks(iname, instance.disks)
1636 def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1637 """Reboot an instance.
1639 @type instance: L{objects.Instance}
1640 @param instance: the instance object to reboot
1641 @type reboot_type: str
1642 @param reboot_type: the type of reboot, one the following
1644 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1645 instance OS, do not recreate the VM
1646 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1647 restart the VM (at the hypervisor level)
1648 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1649 not accepted here, since that mode is handled differently, in
1650 cmdlib, and translates into full stop and start of the
1651 instance (instead of a call_instance_reboot RPC)
1652 @type shutdown_timeout: integer
1653 @param shutdown_timeout: maximum timeout for soft shutdown
1654 @type reason: list of reasons
1655 @param reason: the reason trail for this reboot
1659 running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1662 if instance.name not in running_instances:
1663 _Fail("Cannot reboot instance %s that is not running", instance.name)
1665 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1666 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1668 hyper.RebootInstance(instance)
1669 except errors.HypervisorError, err:
1670 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1671 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1673 InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1674 result = StartInstance(instance, False, reason, store_reason=False)
1675 _StoreInstReasonTrail(instance.name, reason)
1677 except errors.HypervisorError, err:
1678 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1680 _Fail("Invalid reboot_type received: %s", reboot_type)
1683 def InstanceBalloonMemory(instance, memory):
1684 """Resize an instance's memory.
1686 @type instance: L{objects.Instance}
1687 @param instance: the instance object
1689 @param memory: new memory amount in MB
1693 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1694 running = hyper.ListInstances(instance.hvparams)
1695 if instance.name not in running:
1696 logging.info("Instance %s is not running, cannot balloon", instance.name)
1699 hyper.BalloonInstanceMemory(instance, memory)
1700 except errors.HypervisorError, err:
1701 _Fail("Failed to balloon instance memory: %s", err, exc=True)
1704 def MigrationInfo(instance):
1705 """Gather information about an instance to be migrated.
1707 @type instance: L{objects.Instance}
1708 @param instance: the instance definition
1711 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1713 info = hyper.MigrationInfo(instance)
1714 except errors.HypervisorError, err:
1715 _Fail("Failed to fetch migration information: %s", err, exc=True)
1719 def AcceptInstance(instance, info, target):
1720 """Prepare the node to accept an instance.
1722 @type instance: L{objects.Instance}
1723 @param instance: the instance definition
1724 @type info: string/data (opaque)
1725 @param info: migration information, from the source node
1726 @type target: string
1727 @param target: target host (usually ip), on this node
1730 # TODO: why is this required only for DTS_EXT_MIRROR?
1731 if instance.disk_template in constants.DTS_EXT_MIRROR:
1732 # Create the symlinks, as the disks are not active
1735 _GatherAndLinkBlockDevs(instance)
1736 except errors.BlockDeviceError, err:
1737 _Fail("Block device error: %s", err, exc=True)
1739 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1741 hyper.AcceptInstance(instance, info, target)
1742 except errors.HypervisorError, err:
1743 if instance.disk_template in constants.DTS_EXT_MIRROR:
1744 _RemoveBlockDevLinks(instance.name, instance.disks)
1745 _Fail("Failed to accept instance: %s", err, exc=True)
1748 def FinalizeMigrationDst(instance, info, success):
1749 """Finalize any preparation to accept an instance.
1751 @type instance: L{objects.Instance}
1752 @param instance: the instance definition
1753 @type info: string/data (opaque)
1754 @param info: migration information, from the source node
1755 @type success: boolean
1756 @param success: whether the migration was a success or a failure
1759 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1761 hyper.FinalizeMigrationDst(instance, info, success)
1762 except errors.HypervisorError, err:
1763 _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1766 def MigrateInstance(instance, target, live):
1767 """Migrates an instance to another node.
1769 @type instance: L{objects.Instance}
1770 @param instance: the instance definition
1771 @type target: string
1772 @param target: the target node name
1774 @param live: whether the migration should be done live or not (the
1775 interpretation of this parameter is left to the hypervisor)
1776 @raise RPCFail: if migration fails for some reason
1779 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1782 hyper.MigrateInstance(instance, target, live)
1783 except errors.HypervisorError, err:
1784 _Fail("Failed to migrate instance: %s", err, exc=True)
1787 def FinalizeMigrationSource(instance, success, live):
1788 """Finalize the instance migration on the source node.
1790 @type instance: L{objects.Instance}
1791 @param instance: the instance definition of the migrated instance
1793 @param success: whether the migration succeeded or not
1795 @param live: whether the user requested a live migration or not
1796 @raise RPCFail: If the execution fails for some reason
1799 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1802 hyper.FinalizeMigrationSource(instance, success, live)
1803 except Exception, err: # pylint: disable=W0703
1804 _Fail("Failed to finalize the migration on the source node: %s", err,
1808 def GetMigrationStatus(instance):
1809 """Get the migration status
1811 @type instance: L{objects.Instance}
1812 @param instance: the instance that is being migrated
1813 @rtype: L{objects.MigrationStatus}
1814 @return: the status of the current migration (one of
1815 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1816 progress info that can be retrieved from the hypervisor
1817 @raise RPCFail: If the migration status cannot be retrieved
1820 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1822 return hyper.GetMigrationStatus(instance)
1823 except Exception, err: # pylint: disable=W0703
1824 _Fail("Failed to get migration status: %s", err, exc=True)
1827 def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1828 """Creates a block device for an instance.
1830 @type disk: L{objects.Disk}
1831 @param disk: the object describing the disk we should create
1833 @param size: the size of the physical underlying device, in MiB
1835 @param owner: the name of the instance for which disk is created,
1836 used for device cache data
1837 @type on_primary: boolean
1838 @param on_primary: indicates if it is the primary node or not
1840 @param info: string that will be sent to the physical device
1841 creation, used for example to set (LVM) tags on LVs
1842 @type excl_stor: boolean
1843 @param excl_stor: Whether exclusive_storage is active
1845 @return: the new unique_id of the device (this can sometime be
1846 computed only after creation), or None. On secondary nodes,
1847 it's not required to return anything.
1850 # TODO: remove the obsolete "size" argument
1851 # pylint: disable=W0613
1854 for child in disk.children:
1856 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1857 except errors.BlockDeviceError, err:
1858 _Fail("Can't assemble device %s: %s", child, err)
1859 if on_primary or disk.AssembleOnSecondary():
1860 # we need the children open in case the device itself has to
1863 # pylint: disable=E1103
1865 except errors.BlockDeviceError, err:
1866 _Fail("Can't make child '%s' read-write: %s", child, err)
1870 device = bdev.Create(disk, clist, excl_stor)
1871 except errors.BlockDeviceError, err:
1872 _Fail("Can't create block device: %s", err)
1874 if on_primary or disk.AssembleOnSecondary():
1877 except errors.BlockDeviceError, err:
1878 _Fail("Can't assemble device after creation, unusual event: %s", err)
1879 if on_primary or disk.OpenOnSecondary():
1881 device.Open(force=True)
1882 except errors.BlockDeviceError, err:
1883 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1884 DevCacheManager.UpdateCache(device.dev_path, owner,
1885 on_primary, disk.iv_name)
1887 device.SetInfo(info)
1889 return device.unique_id
1892 def _WipeDevice(path, offset, size):
1893 """This function actually wipes the device.
1895 @param path: The path to the device to wipe
1896 @param offset: The offset in MiB in the file
1897 @param size: The size in MiB to write
1900 # Internal sizes are always in Mebibytes; if the following "dd" command
1901 # should use a different block size the offset and size given to this
1902 # function must be adjusted accordingly before being passed to "dd".
1903 block_size = 1024 * 1024
1905 cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1906 "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1908 result = utils.RunCmd(cmd)
1911 _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1912 result.fail_reason, result.output)
1915 def BlockdevWipe(disk, offset, size):
1916 """Wipes a block device.
1918 @type disk: L{objects.Disk}
1919 @param disk: the disk object we want to wipe
1921 @param offset: The offset in MiB in the file
1923 @param size: The size in MiB to write
1927 rdev = _RecursiveFindBD(disk)
1928 except errors.BlockDeviceError:
1932 _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1934 # Do cross verify some of the parameters
1936 _Fail("Negative offset")
1938 _Fail("Negative size")
1939 if offset > rdev.size:
1940 _Fail("Offset is bigger than device size")
1941 if (offset + size) > rdev.size:
1942 _Fail("The provided offset and size to wipe is bigger than device size")
1944 _WipeDevice(rdev.dev_path, offset, size)
1947 def BlockdevPauseResumeSync(disks, pause):
1948 """Pause or resume the sync of the block device.
1950 @type disks: list of L{objects.Disk}
1951 @param disks: the disks object we want to pause/resume
1953 @param pause: Wheater to pause or resume
1959 rdev = _RecursiveFindBD(disk)
1960 except errors.BlockDeviceError:
1964 success.append((False, ("Cannot change sync for device %s:"
1965 " device not found" % disk.iv_name)))
1968 result = rdev.PauseResumeSync(pause)
1971 success.append((result, None))
1977 success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1982 def BlockdevRemove(disk):
1983 """Remove a block device.
1985 @note: This is intended to be called recursively.
1987 @type disk: L{objects.Disk}
1988 @param disk: the disk object we should remove
1990 @return: the success of the operation
1995 rdev = _RecursiveFindBD(disk)
1996 except errors.BlockDeviceError, err:
1997 # probably can't attach
1998 logging.info("Can't attach to device %s in remove", disk)
2000 if rdev is not None:
2001 r_path = rdev.dev_path
2004 except errors.BlockDeviceError, err:
2005 msgs.append(str(err))
2007 DevCacheManager.RemoveCache(r_path)
2010 for child in disk.children:
2012 BlockdevRemove(child)
2013 except RPCFail, err:
2014 msgs.append(str(err))
2017 _Fail("; ".join(msgs))
2020 def _RecursiveAssembleBD(disk, owner, as_primary):
2021 """Activate a block device for an instance.
2023 This is run on the primary and secondary nodes for an instance.
2025 @note: this function is called recursively.
2027 @type disk: L{objects.Disk}
2028 @param disk: the disk we try to assemble
2030 @param owner: the name of the instance which owns the disk
2031 @type as_primary: boolean
2032 @param as_primary: if we should make the block device
2035 @return: the assembled device or None (in case no device
2037 @raise errors.BlockDeviceError: in case there is an error
2038 during the activation of the children or the device
2044 mcn = disk.ChildrenNeeded()
2046 mcn = 0 # max number of Nones allowed
2048 mcn = len(disk.children) - mcn # max number of Nones
2049 for chld_disk in disk.children:
2051 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2052 except errors.BlockDeviceError, err:
2053 if children.count(None) >= mcn:
2056 logging.error("Error in child activation (but continuing): %s",
2058 children.append(cdev)
2060 if as_primary or disk.AssembleOnSecondary():
2061 r_dev = bdev.Assemble(disk, children)
2063 if as_primary or disk.OpenOnSecondary():
2065 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2066 as_primary, disk.iv_name)
2073 def BlockdevAssemble(disk, owner, as_primary, idx):
2074 """Activate a block device for an instance.
2076 This is a wrapper over _RecursiveAssembleBD.
2078 @rtype: str or boolean
2079 @return: a C{/dev/...} path for primary nodes, and
2080 C{True} for secondary nodes
2084 result = _RecursiveAssembleBD(disk, owner, as_primary)
2085 if isinstance(result, BlockDev):
2086 # pylint: disable=E1103
2087 result = result.dev_path
2089 _SymlinkBlockDev(owner, result, idx)
2090 except errors.BlockDeviceError, err:
2091 _Fail("Error while assembling disk: %s", err, exc=True)
2092 except OSError, err:
2093 _Fail("Error while symlinking disk: %s", err, exc=True)
2098 def BlockdevShutdown(disk):
2099 """Shut down a block device.
2101 First, if the device is assembled (Attach() is successful), then
2102 the device is shutdown. Then the children of the device are
2105 This function is called recursively. Note that we don't cache the
2106 children or such, as oppossed to assemble, shutdown of different
2107 devices doesn't require that the upper device was active.
2109 @type disk: L{objects.Disk}
2110 @param disk: the description of the disk we should
2116 r_dev = _RecursiveFindBD(disk)
2117 if r_dev is not None:
2118 r_path = r_dev.dev_path
2121 DevCacheManager.RemoveCache(r_path)
2122 except errors.BlockDeviceError, err:
2123 msgs.append(str(err))
2126 for child in disk.children:
2128 BlockdevShutdown(child)
2129 except RPCFail, err:
2130 msgs.append(str(err))
2133 _Fail("; ".join(msgs))
2136 def BlockdevAddchildren(parent_cdev, new_cdevs):
2137 """Extend a mirrored block device.
2139 @type parent_cdev: L{objects.Disk}
2140 @param parent_cdev: the disk to which we should add children
2141 @type new_cdevs: list of L{objects.Disk}
2142 @param new_cdevs: the list of children which we should add
2146 parent_bdev = _RecursiveFindBD(parent_cdev)
2147 if parent_bdev is None:
2148 _Fail("Can't find parent device '%s' in add children", parent_cdev)
2149 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2150 if new_bdevs.count(None) > 0:
2151 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2152 parent_bdev.AddChildren(new_bdevs)
2155 def BlockdevRemovechildren(parent_cdev, new_cdevs):
2156 """Shrink a mirrored block device.
2158 @type parent_cdev: L{objects.Disk}
2159 @param parent_cdev: the disk from which we should remove children
2160 @type new_cdevs: list of L{objects.Disk}
2161 @param new_cdevs: the list of children which we should remove
2165 parent_bdev = _RecursiveFindBD(parent_cdev)
2166 if parent_bdev is None:
2167 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2169 for disk in new_cdevs:
2170 rpath = disk.StaticDevPath()
2172 bd = _RecursiveFindBD(disk)
2174 _Fail("Can't find device %s while removing children", disk)
2176 devs.append(bd.dev_path)
2178 if not utils.IsNormAbsPath(rpath):
2179 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2181 parent_bdev.RemoveChildren(devs)
2184 def BlockdevGetmirrorstatus(disks):
2185 """Get the mirroring status of a list of devices.
2187 @type disks: list of L{objects.Disk}
2188 @param disks: the list of disks which we should query
2190 @return: List of L{objects.BlockDevStatus}, one for each disk
2191 @raise errors.BlockDeviceError: if any of the disks cannot be
2197 rbd = _RecursiveFindBD(dsk)
2199 _Fail("Can't find device %s", dsk)
2201 stats.append(rbd.CombinedSyncStatus())
2206 def BlockdevGetmirrorstatusMulti(disks):
2207 """Get the mirroring status of a list of devices.
2209 @type disks: list of L{objects.Disk}
2210 @param disks: the list of disks which we should query
2212 @return: List of tuples, (bool, status), one for each disk; bool denotes
2213 success/failure, status is L{objects.BlockDevStatus} on success, string
2220 rbd = _RecursiveFindBD(disk)
2222 result.append((False, "Can't find device %s" % disk))
2225 status = rbd.CombinedSyncStatus()
2226 except errors.BlockDeviceError, err:
2227 logging.exception("Error while getting disk status")
2228 result.append((False, str(err)))
2230 result.append((True, status))
2232 assert len(disks) == len(result)
2237 def _RecursiveFindBD(disk):
2238 """Check if a device is activated.
2240 If so, return information about the real device.
2242 @type disk: L{objects.Disk}
2243 @param disk: the disk object we need to find
2245 @return: None if the device can't be found,
2246 otherwise the device instance
2251 for chdisk in disk.children:
2252 children.append(_RecursiveFindBD(chdisk))
2254 return bdev.FindDevice(disk, children)
2257 def _OpenRealBD(disk):
2258 """Opens the underlying block device of a disk.
2260 @type disk: L{objects.Disk}
2261 @param disk: the disk object we want to open
2264 real_disk = _RecursiveFindBD(disk)
2265 if real_disk is None:
2266 _Fail("Block device '%s' is not set up", disk)
2273 def BlockdevFind(disk):
2274 """Check if a device is activated.
2276 If it is, return information about the real device.
2278 @type disk: L{objects.Disk}
2279 @param disk: the disk to find
2280 @rtype: None or objects.BlockDevStatus
2281 @return: None if the disk cannot be found, otherwise a the current
2286 rbd = _RecursiveFindBD(disk)
2287 except errors.BlockDeviceError, err:
2288 _Fail("Failed to find device: %s", err, exc=True)
2293 return rbd.GetSyncStatus()
2296 def BlockdevGetdimensions(disks):
2297 """Computes the size of the given disks.
2299 If a disk is not found, returns None instead.
2301 @type disks: list of L{objects.Disk}
2302 @param disks: the list of disk to compute the size for
2304 @return: list with elements None if the disk cannot be found,
2305 otherwise the pair (size, spindles), where spindles is None if the
2306 device doesn't support that
2312 rbd = _RecursiveFindBD(cf)
2313 except errors.BlockDeviceError:
2319 result.append(rbd.GetActualDimensions())
2323 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2324 """Export a block device to a remote node.
2326 @type disk: L{objects.Disk}
2327 @param disk: the description of the disk to export
2328 @type dest_node: str
2329 @param dest_node: the destination node to export to
2330 @type dest_path: str
2331 @param dest_path: the destination path on the target node
2332 @type cluster_name: str
2333 @param cluster_name: the cluster name, needed for SSH hostalias
2337 real_disk = _OpenRealBD(disk)
2339 # the block size on the read dd is 1MiB to match our units
2340 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2341 "dd if=%s bs=1048576 count=%s",
2342 real_disk.dev_path, str(disk.size))
2344 # we set here a smaller block size as, due to ssh buffering, more
2345 # than 64-128k will mostly ignored; we use nocreat to fail if the
2346 # device is not already there or we pass a wrong path; we use
2347 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2348 # to not buffer too much memory; this means that at best, we flush
2349 # every 64k, which will not be very fast
2350 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2351 " oflag=dsync", dest_path)
2353 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2354 constants.SSH_LOGIN_USER,
2357 # all commands have been checked, so we're safe to combine them
2358 command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2360 result = utils.RunCmd(["bash", "-c", command])
2363 _Fail("Disk copy command '%s' returned error: %s"
2364 " output: %s", command, result.fail_reason, result.output)
2367 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2368 """Write a file to the filesystem.
2370 This allows the master to overwrite(!) a file. It will only perform
2371 the operation if the file belongs to a list of configuration files.
2373 @type file_name: str
2374 @param file_name: the target file name
2376 @param data: the new contents of the file
2378 @param mode: the mode to give the file (can be None)
2380 @param uid: the owner of the file
2382 @param gid: the group of the file
2384 @param atime: the atime to set on the file (can be None)
2386 @param mtime: the mtime to set on the file (can be None)
2390 file_name = vcluster.LocalizeVirtualPath(file_name)
2392 if not os.path.isabs(file_name):
2393 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2395 if file_name not in _ALLOWED_UPLOAD_FILES:
2396 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2399 raw_data = _Decompress(data)
2401 if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2402 _Fail("Invalid username/groupname type")
2404 getents = runtime.GetEnts()
2405 uid = getents.LookupUser(uid)
2406 gid = getents.LookupGroup(gid)
2408 utils.SafeWriteFile(file_name, None,
2409 data=raw_data, mode=mode, uid=uid, gid=gid,
2410 atime=atime, mtime=mtime)
2413 def RunOob(oob_program, command, node, timeout):
2414 """Executes oob_program with given command on given node.
2416 @param oob_program: The path to the executable oob_program
2417 @param command: The command to invoke on oob_program
2418 @param node: The node given as an argument to the program
2419 @param timeout: Timeout after which we kill the oob program
2422 @raise RPCFail: If execution fails for some reason
2425 result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2428 _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2429 result.fail_reason, result.output)
2431 return result.stdout
2434 def _OSOndiskAPIVersion(os_dir):
2435 """Compute and return the API version of a given OS.
2437 This function will try to read the API version of the OS residing in
2438 the 'os_dir' directory.
2441 @param os_dir: the directory in which we should look for the OS
2443 @return: tuple (status, data) with status denoting the validity and
2444 data holding either the vaid versions or an error message
2447 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2450 st = os.stat(api_file)
2451 except EnvironmentError, err:
2452 return False, ("Required file '%s' not found under path %s: %s" %
2453 (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2455 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2456 return False, ("File '%s' in %s is not a regular file" %
2457 (constants.OS_API_FILE, os_dir))
2460 api_versions = utils.ReadFile(api_file).splitlines()
2461 except EnvironmentError, err:
2462 return False, ("Error while reading the API version file at %s: %s" %
2463 (api_file, utils.ErrnoOrStr(err)))
2466 api_versions = [int(version.strip()) for version in api_versions]
2467 except (TypeError, ValueError), err:
2468 return False, ("API version(s) can't be converted to integer: %s" %
2471 return True, api_versions
2474 def DiagnoseOS(top_dirs=None):
2475 """Compute the validity for all OSes.
2477 @type top_dirs: list
2478 @param top_dirs: the list of directories in which to
2479 search (if not given defaults to
2480 L{pathutils.OS_SEARCH_PATH})
2481 @rtype: list of L{objects.OS}
2482 @return: a list of tuples (name, path, status, diagnose, variants,
2483 parameters, api_version) for all (potential) OSes under all
2484 search paths, where:
2485 - name is the (potential) OS name
2486 - path is the full path to the OS
2487 - status True/False is the validity of the OS
2488 - diagnose is the error message for an invalid OS, otherwise empty
2489 - variants is a list of supported OS variants, if any
2490 - parameters is a list of (name, help) parameters, if any
2491 - api_version is a list of support OS API versions
2494 if top_dirs is None:
2495 top_dirs = pathutils.OS_SEARCH_PATH
2498 for dir_name in top_dirs:
2499 if os.path.isdir(dir_name):
2501 f_names = utils.ListVisibleFiles(dir_name)
2502 except EnvironmentError, err:
2503 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2505 for name in f_names:
2506 os_path = utils.PathJoin(dir_name, name)
2507 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2510 variants = os_inst.supported_variants
2511 parameters = os_inst.supported_parameters
2512 api_versions = os_inst.api_versions
2515 variants = parameters = api_versions = []
2516 result.append((name, os_path, status, diagnose, variants,
2517 parameters, api_versions))
2522 def _TryOSFromDisk(name, base_dir=None):
2523 """Create an OS instance from disk.
2525 This function will return an OS instance if the given name is a
2528 @type base_dir: string
2529 @keyword base_dir: Base directory containing OS installations.
2530 Defaults to a search in all the OS_SEARCH_PATH dirs.
2532 @return: success and either the OS instance if we find a valid one,
2536 if base_dir is None:
2537 os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2539 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2542 return False, "Directory for OS %s not found in search path" % name
2544 status, api_versions = _OSOndiskAPIVersion(os_dir)
2547 return status, api_versions
2549 if not constants.OS_API_VERSIONS.intersection(api_versions):
2550 return False, ("API version mismatch for path '%s': found %s, want %s." %
2551 (os_dir, api_versions, constants.OS_API_VERSIONS))
2553 # OS Files dictionary, we will populate it with the absolute path
2554 # names; if the value is True, then it is a required file, otherwise
2556 os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2558 if max(api_versions) >= constants.OS_API_V15:
2559 os_files[constants.OS_VARIANTS_FILE] = False
2561 if max(api_versions) >= constants.OS_API_V20:
2562 os_files[constants.OS_PARAMETERS_FILE] = True
2564 del os_files[constants.OS_SCRIPT_VERIFY]
2566 for (filename, required) in os_files.items():
2567 os_files[filename] = utils.PathJoin(os_dir, filename)
2570 st = os.stat(os_files[filename])
2571 except EnvironmentError, err:
2572 if err.errno == errno.ENOENT and not required:
2573 del os_files[filename]
2575 return False, ("File '%s' under path '%s' is missing (%s)" %
2576 (filename, os_dir, utils.ErrnoOrStr(err)))
2578 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2579 return False, ("File '%s' under path '%s' is not a regular file" %
2582 if filename in constants.OS_SCRIPTS:
2583 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2584 return False, ("File '%s' under path '%s' is not executable" %
2588 if constants.OS_VARIANTS_FILE in os_files:
2589 variants_file = os_files[constants.OS_VARIANTS_FILE]
2592 utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2593 except EnvironmentError, err:
2594 # we accept missing files, but not other errors
2595 if err.errno != errno.ENOENT:
2596 return False, ("Error while reading the OS variants file at %s: %s" %
2597 (variants_file, utils.ErrnoOrStr(err)))
2600 if constants.OS_PARAMETERS_FILE in os_files:
2601 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2603 parameters = utils.ReadFile(parameters_file).splitlines()
2604 except EnvironmentError, err:
2605 return False, ("Error while reading the OS parameters file at %s: %s" %
2606 (parameters_file, utils.ErrnoOrStr(err)))
2607 parameters = [v.split(None, 1) for v in parameters]
2609 os_obj = objects.OS(name=name, path=os_dir,
2610 create_script=os_files[constants.OS_SCRIPT_CREATE],
2611 export_script=os_files[constants.OS_SCRIPT_EXPORT],
2612 import_script=os_files[constants.OS_SCRIPT_IMPORT],
2613 rename_script=os_files[constants.OS_SCRIPT_RENAME],
2614 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2616 supported_variants=variants,
2617 supported_parameters=parameters,
2618 api_versions=api_versions)
2622 def OSFromDisk(name, base_dir=None):
2623 """Create an OS instance from disk.
2625 This function will return an OS instance if the given name is a
2626 valid OS name. Otherwise, it will raise an appropriate
2627 L{RPCFail} exception, detailing why this is not a valid OS.
2629 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2630 an exception but returns true/false status data.
2632 @type base_dir: string
2633 @keyword base_dir: Base directory containing OS installations.
2634 Defaults to a search in all the OS_SEARCH_PATH dirs.
2635 @rtype: L{objects.OS}
2636 @return: the OS instance if we find a valid one
2637 @raise RPCFail: if we don't find a valid OS
2640 name_only = objects.OS.GetName(name)
2641 status, payload = _TryOSFromDisk(name_only, base_dir)
2649 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2650 """Calculate the basic environment for an os script.
2653 @param os_name: full operating system name (including variant)
2654 @type inst_os: L{objects.OS}
2655 @param inst_os: operating system for which the environment is being built
2656 @type os_params: dict
2657 @param os_params: the OS parameters
2658 @type debug: integer
2659 @param debug: debug level (0 or 1, for OS Api 10)
2661 @return: dict of environment variables
2662 @raise errors.BlockDeviceError: if the block device
2668 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2669 result["OS_API_VERSION"] = "%d" % api_version
2670 result["OS_NAME"] = inst_os.name
2671 result["DEBUG_LEVEL"] = "%d" % debug
2674 if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2675 variant = objects.OS.GetVariant(os_name)
2677 variant = inst_os.supported_variants[0]
2680 result["OS_VARIANT"] = variant
2683 for pname, pvalue in os_params.items():
2684 result["OSP_%s" % pname.upper()] = pvalue
2686 # Set a default path otherwise programs called by OS scripts (or
2687 # even hooks called from OS scripts) might break, and we don't want
2688 # to have each script require setting a PATH variable
2689 result["PATH"] = constants.HOOKS_PATH
2694 def OSEnvironment(instance, inst_os, debug=0):
2695 """Calculate the environment for an os script.
2697 @type instance: L{objects.Instance}
2698 @param instance: target instance for the os script run
2699 @type inst_os: L{objects.OS}
2700 @param inst_os: operating system for which the environment is being built
2701 @type debug: integer
2702 @param debug: debug level (0 or 1, for OS Api 10)
2704 @return: dict of environment variables
2705 @raise errors.BlockDeviceError: if the block device
2709 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2711 for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2712 result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2714 result["HYPERVISOR"] = instance.hypervisor
2715 result["DISK_COUNT"] = "%d" % len(instance.disks)
2716 result["NIC_COUNT"] = "%d" % len(instance.nics)
2717 result["INSTANCE_SECONDARY_NODES"] = \
2718 ("%s" % " ".join(instance.secondary_nodes))
2721 for idx, disk in enumerate(instance.disks):
2722 real_disk = _OpenRealBD(disk)
2723 result["DISK_%d_PATH" % idx] = real_disk.dev_path
2724 result["DISK_%d_ACCESS" % idx] = disk.mode
2725 if constants.HV_DISK_TYPE in instance.hvparams:
2726 result["DISK_%d_FRONTEND_TYPE" % idx] = \
2727 instance.hvparams[constants.HV_DISK_TYPE]
2728 if disk.dev_type in constants.LDS_BLOCK:
2729 result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2730 elif disk.dev_type == constants.LD_FILE:
2731 result["DISK_%d_BACKEND_TYPE" % idx] = \
2732 "file:%s" % disk.physical_id[0]
2735 for idx, nic in enumerate(instance.nics):
2736 result["NIC_%d_MAC" % idx] = nic.mac
2738 result["NIC_%d_IP" % idx] = nic.ip
2739 result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2740 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2741 result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2742 if nic.nicparams[constants.NIC_LINK]:
2743 result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2745 nobj = objects.Network.FromDict(nic.netinfo)
2746 result.update(nobj.HooksDict("NIC_%d_" % idx))
2747 if constants.HV_NIC_TYPE in instance.hvparams:
2748 result["NIC_%d_FRONTEND_TYPE" % idx] = \
2749 instance.hvparams[constants.HV_NIC_TYPE]
2752 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2753 for key, value in source.items():
2754 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2759 def DiagnoseExtStorage(top_dirs=None):
2760 """Compute the validity for all ExtStorage Providers.
2762 @type top_dirs: list
2763 @param top_dirs: the list of directories in which to
2764 search (if not given defaults to
2765 L{pathutils.ES_SEARCH_PATH})
2766 @rtype: list of L{objects.ExtStorage}
2767 @return: a list of tuples (name, path, status, diagnose, parameters)
2768 for all (potential) ExtStorage Providers under all
2769 search paths, where:
2770 - name is the (potential) ExtStorage Provider
2771 - path is the full path to the ExtStorage Provider
2772 - status True/False is the validity of the ExtStorage Provider
2773 - diagnose is the error message for an invalid ExtStorage Provider,
2775 - parameters is a list of (name, help) parameters, if any
2778 if top_dirs is None:
2779 top_dirs = pathutils.ES_SEARCH_PATH
2782 for dir_name in top_dirs:
2783 if os.path.isdir(dir_name):
2785 f_names = utils.ListVisibleFiles(dir_name)
2786 except EnvironmentError, err:
2787 logging.exception("Can't list the ExtStorage directory %s: %s",
2790 for name in f_names:
2791 es_path = utils.PathJoin(dir_name, name)
2792 status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2795 parameters = es_inst.supported_parameters
2799 result.append((name, es_path, status, diagnose, parameters))
2804 def BlockdevGrow(disk, amount, dryrun, backingstore):
2805 """Grow a stack of block devices.
2807 This function is called recursively, with the childrens being the
2808 first ones to resize.
2810 @type disk: L{objects.Disk}
2811 @param disk: the disk to be grown
2812 @type amount: integer
2813 @param amount: the amount (in mebibytes) to grow with
2814 @type dryrun: boolean
2815 @param dryrun: whether to execute the operation in simulation mode
2816 only, without actually increasing the size
2817 @param backingstore: whether to execute the operation on backing storage
2818 only, or on "logical" storage only; e.g. DRBD is logical storage,
2819 whereas LVM, file, RBD are backing storage
2820 @rtype: (status, result)
2821 @return: a tuple with the status of the operation (True/False), and
2822 the errors message if status is False
2825 r_dev = _RecursiveFindBD(disk)
2827 _Fail("Cannot find block device %s", disk)
2830 r_dev.Grow(amount, dryrun, backingstore)
2831 except errors.BlockDeviceError, err:
2832 _Fail("Failed to grow block device: %s", err, exc=True)
2835 def BlockdevSnapshot(disk):
2836 """Create a snapshot copy of a block device.
2838 This function is called recursively, and the snapshot is actually created
2839 just for the leaf lvm backend device.
2841 @type disk: L{objects.Disk}
2842 @param disk: the disk to be snapshotted
2844 @return: snapshot disk ID as (vg, lv)
2847 if disk.dev_type == constants.LD_DRBD8:
2848 if not disk.children:
2849 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2851 return BlockdevSnapshot(disk.children[0])
2852 elif disk.dev_type == constants.LD_LV:
2853 r_dev = _RecursiveFindBD(disk)
2854 if r_dev is not None:
2855 # FIXME: choose a saner value for the snapshot size
2856 # let's stay on the safe side and ask for the full size, for now
2857 return r_dev.Snapshot(disk.size)
2859 _Fail("Cannot find block device %s", disk)
2861 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2862 disk.unique_id, disk.dev_type)
2865 def BlockdevSetInfo(disk, info):
2866 """Sets 'metadata' information on block devices.
2868 This function sets 'info' metadata on block devices. Initial
2869 information is set at device creation; this function should be used
2870 for example after renames.
2872 @type disk: L{objects.Disk}
2873 @param disk: the disk to be grown
2875 @param info: new 'info' metadata
2876 @rtype: (status, result)
2877 @return: a tuple with the status of the operation (True/False), and
2878 the errors message if status is False
2881 r_dev = _RecursiveFindBD(disk)
2883 _Fail("Cannot find block device %s", disk)
2887 except errors.BlockDeviceError, err:
2888 _Fail("Failed to set information on block device: %s", err, exc=True)
2891 def FinalizeExport(instance, snap_disks):
2892 """Write out the export configuration information.
2894 @type instance: L{objects.Instance}
2895 @param instance: the instance which we export, used for
2896 saving configuration
2897 @type snap_disks: list of L{objects.Disk}
2898 @param snap_disks: list of snapshot block devices, which
2899 will be used to get the actual name of the dump file
2904 destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2905 finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2907 config = objects.SerializableConfigParser()
2909 config.add_section(constants.INISECT_EXP)
2910 config.set(constants.INISECT_EXP, "version", "0")
2911 config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2912 config.set(constants.INISECT_EXP, "source", instance.primary_node)
2913 config.set(constants.INISECT_EXP, "os", instance.os)
2914 config.set(constants.INISECT_EXP, "compression", "none")
2916 config.add_section(constants.INISECT_INS)
2917 config.set(constants.INISECT_INS, "name", instance.name)
2918 config.set(constants.INISECT_INS, "maxmem", "%d" %
2919 instance.beparams[constants.BE_MAXMEM])
2920 config.set(constants.INISECT_INS, "minmem", "%d" %
2921 instance.beparams[constants.BE_MINMEM])
2922 # "memory" is deprecated, but useful for exporting to old ganeti versions
2923 config.set(constants.INISECT_INS, "memory", "%d" %
2924 instance.beparams[constants.BE_MAXMEM])
2925 config.set(constants.INISECT_INS, "vcpus", "%d" %
2926 instance.beparams[constants.BE_VCPUS])
2927 config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2928 config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2929 config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2932 for nic_count, nic in enumerate(instance.nics):
2934 config.set(constants.INISECT_INS, "nic%d_mac" %
2935 nic_count, "%s" % nic.mac)
2936 config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2937 config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2939 for param in constants.NICS_PARAMETER_TYPES:
2940 config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2941 "%s" % nic.nicparams.get(param, None))
2942 # TODO: redundant: on load can read nics until it doesn't exist
2943 config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2946 for disk_count, disk in enumerate(snap_disks):
2949 config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2950 ("%s" % disk.iv_name))
2951 config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2952 ("%s" % disk.physical_id[1]))
2953 config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2956 config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2958 # New-style hypervisor/backend parameters
2960 config.add_section(constants.INISECT_HYP)
2961 for name, value in instance.hvparams.items():
2962 if name not in constants.HVC_GLOBALS:
2963 config.set(constants.INISECT_HYP, name, str(value))
2965 config.add_section(constants.INISECT_BEP)
2966 for name, value in instance.beparams.items():
2967 config.set(constants.INISECT_BEP, name, str(value))
2969 config.add_section(constants.INISECT_OSP)
2970 for name, value in instance.osparams.items():
2971 config.set(constants.INISECT_OSP, name, str(value))
2973 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2974 data=config.Dumps())
2975 shutil.rmtree(finaldestdir, ignore_errors=True)
2976 shutil.move(destdir, finaldestdir)
2979 def ExportInfo(dest):
2980 """Get export configuration information.
2983 @param dest: directory containing the export
2985 @rtype: L{objects.SerializableConfigParser}
2986 @return: a serializable config file containing the
2990 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2992 config = objects.SerializableConfigParser()
2995 if (not config.has_section(constants.INISECT_EXP) or
2996 not config.has_section(constants.INISECT_INS)):
2997 _Fail("Export info file doesn't have the required fields")
2999 return config.Dumps()
3003 """Return a list of exports currently available on this machine.
3006 @return: list of the exports
3009 if os.path.isdir(pathutils.EXPORT_DIR):
3010 return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3012 _Fail("No exports directory")
3015 def RemoveExport(export):
3016 """Remove an existing export from the node.
3019 @param export: the name of the export to remove
3023 target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3026 shutil.rmtree(target)
3027 except EnvironmentError, err:
3028 _Fail("Error while removing the export: %s", err, exc=True)
3031 def BlockdevRename(devlist):
3032 """Rename a list of block devices.
3034 @type devlist: list of tuples
3035 @param devlist: list of tuples of the form (disk,
3036 new_logical_id, new_physical_id); disk is an
3037 L{objects.Disk} object describing the current disk,
3038 and new logical_id/physical_id is the name we
3041 @return: True if all renames succeeded, False otherwise
3046 for disk, unique_id in devlist:
3047 dev = _RecursiveFindBD(disk)
3049 msgs.append("Can't find device %s in rename" % str(disk))
3053 old_rpath = dev.dev_path
3054 dev.Rename(unique_id)
3055 new_rpath = dev.dev_path
3056 if old_rpath != new_rpath:
3057 DevCacheManager.RemoveCache(old_rpath)
3058 # FIXME: we should add the new cache information here, like:
3059 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3060 # but we don't have the owner here - maybe parse from existing
3061 # cache? for now, we only lose lvm data when we rename, which
3062 # is less critical than DRBD or MD
3063 except errors.BlockDeviceError, err:
3064 msgs.append("Can't rename device '%s' to '%s': %s" %
3065 (dev, unique_id, err))
3066 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3069 _Fail("; ".join(msgs))
3072 def _TransformFileStorageDir(fs_dir):
3073 """Checks whether given file_storage_dir is valid.
3075 Checks wheter the given fs_dir is within the cluster-wide default
3076 file_storage_dir or the shared_file_storage_dir, which are stored in
3077 SimpleStore. Only paths under those directories are allowed.
3080 @param fs_dir: the path to check
3082 @return: the normalized path if valid, None otherwise
3085 if not (constants.ENABLE_FILE_STORAGE or
3086 constants.ENABLE_SHARED_FILE_STORAGE):
3087 _Fail("File storage disabled at configure time")
3089 bdev.CheckFileStoragePath(fs_dir)
3091 return os.path.normpath(fs_dir)
3094 def CreateFileStorageDir(file_storage_dir):
3095 """Create file storage directory.
3097 @type file_storage_dir: str
3098 @param file_storage_dir: directory to create
3101 @return: tuple with first element a boolean indicating wheter dir
3102 creation was successful or not
3105 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3106 if os.path.exists(file_storage_dir):
3107 if not os.path.isdir(file_storage_dir):
3108 _Fail("Specified storage dir '%s' is not a directory",
3112 os.makedirs(file_storage_dir, 0750)
3113 except OSError, err:
3114 _Fail("Cannot create file storage directory '%s': %s",
3115 file_storage_dir, err, exc=True)
3118 def RemoveFileStorageDir(file_storage_dir):
3119 """Remove file storage directory.
3121 Remove it only if it's empty. If not log an error and return.
3123 @type file_storage_dir: str
3124 @param file_storage_dir: the directory we should cleanup
3125 @rtype: tuple (success,)
3126 @return: tuple of one element, C{success}, denoting
3127 whether the operation was successful
3130 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3131 if os.path.exists(file_storage_dir):
3132 if not os.path.isdir(file_storage_dir):
3133 _Fail("Specified Storage directory '%s' is not a directory",
3135 # deletes dir only if empty, otherwise we want to fail the rpc call
3137 os.rmdir(file_storage_dir)
3138 except OSError, err:
3139 _Fail("Cannot remove file storage directory '%s': %s",
3140 file_storage_dir, err)
3143 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3144 """Rename the file storage directory.
3146 @type old_file_storage_dir: str
3147 @param old_file_storage_dir: the current path
3148 @type new_file_storage_dir: str
3149 @param new_file_storage_dir: the name we should rename to
3150 @rtype: tuple (success,)
3151 @return: tuple of one element, C{success}, denoting
3152 whether the operation was successful
3155 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3156 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3157 if not os.path.exists(new_file_storage_dir):
3158 if os.path.isdir(old_file_storage_dir):
3160 os.rename(old_file_storage_dir, new_file_storage_dir)
3161 except OSError, err:
3162 _Fail("Cannot rename '%s' to '%s': %s",
3163 old_file_storage_dir, new_file_storage_dir, err)
3165 _Fail("Specified storage dir '%s' is not a directory",
3166 old_file_storage_dir)
3168 if os.path.exists(old_file_storage_dir):
3169 _Fail("Cannot rename '%s' to '%s': both locations exist",
3170 old_file_storage_dir, new_file_storage_dir)
3173 def _EnsureJobQueueFile(file_name):
3174 """Checks whether the given filename is in the queue directory.
3176 @type file_name: str
3177 @param file_name: the file name we should check
3179 @raises RPCFail: if the file is not valid
3182 if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3183 _Fail("Passed job queue file '%s' does not belong to"
3184 " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3187 def JobQueueUpdate(file_name, content):
3188 """Updates a file in the queue directory.
3190 This is just a wrapper over L{utils.io.WriteFile}, with proper
3193 @type file_name: str
3194 @param file_name: the job file name
3196 @param content: the new job contents
3198 @return: the success of the operation
3201 file_name = vcluster.LocalizeVirtualPath(file_name)
3203 _EnsureJobQueueFile(file_name)
3204 getents = runtime.GetEnts()
3206 # Write and replace the file atomically
3207 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3208 gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3211 def JobQueueRename(old, new):
3212 """Renames a job queue file.
3214 This is just a wrapper over os.rename with proper checking.
3217 @param old: the old (actual) file name
3219 @param new: the desired file name
3221 @return: the success of the operation and payload
3224 old = vcluster.LocalizeVirtualPath(old)
3225 new = vcluster.LocalizeVirtualPath(new)
3227 _EnsureJobQueueFile(old)
3228 _EnsureJobQueueFile(new)
3230 getents = runtime.GetEnts()
3232 utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3233 dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3236 def BlockdevClose(instance_name, disks):
3237 """Closes the given block devices.
3239 This means they will be switched to secondary mode (in case of
3242 @param instance_name: if the argument is not empty, the symlinks
3243 of this instance will be removed
3244 @type disks: list of L{objects.Disk}
3245 @param disks: the list of disks to be closed
3246 @rtype: tuple (success, message)
3247 @return: a tuple of success and message, where success
3248 indicates the succes of the operation, and message
3249 which will contain the error details in case we
3255 rd = _RecursiveFindBD(cf)
3257 _Fail("Can't find device %s", cf)
3264 except errors.BlockDeviceError, err:
3265 msg.append(str(err))
3267 _Fail("Can't make devices secondary: %s", ",".join(msg))
3270 _RemoveBlockDevLinks(instance_name, disks)
3273 def ValidateHVParams(hvname, hvparams):
3274 """Validates the given hypervisor parameters.
3276 @type hvname: string
3277 @param hvname: the hypervisor name
3278 @type hvparams: dict
3279 @param hvparams: the hypervisor parameters to be validated
3284 hv_type = hypervisor.GetHypervisor(hvname)
3285 hv_type.ValidateParameters(hvparams)
3286 except errors.HypervisorError, err:
3287 _Fail(str(err), log=False)
3290 def _CheckOSPList(os_obj, parameters):
3291 """Check whether a list of parameters is supported by the OS.
3293 @type os_obj: L{objects.OS}
3294 @param os_obj: OS object to check
3295 @type parameters: list
3296 @param parameters: the list of parameters to check
3299 supported = [v[0] for v in os_obj.supported_parameters]
3300 delta = frozenset(parameters).difference(supported)
3302 _Fail("The following parameters are not supported"
3303 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3306 def ValidateOS(required, osname, checks, osparams):
3307 """Validate the given OS' parameters.
3309 @type required: boolean
3310 @param required: whether absence of the OS should translate into
3312 @type osname: string
3313 @param osname: the OS to be validated
3315 @param checks: list of the checks to run (currently only 'parameters')
3316 @type osparams: dict
3317 @param osparams: dictionary with OS parameters
3319 @return: True if the validation passed, or False if the OS was not
3320 found and L{required} was false
3323 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3324 _Fail("Unknown checks required for OS %s: %s", osname,
3325 set(checks).difference(constants.OS_VALIDATE_CALLS))
3327 name_only = objects.OS.GetName(osname)
3328 status, tbv = _TryOSFromDisk(name_only, None)
3336 if max(tbv.api_versions) < constants.OS_API_V20:
3339 if constants.OS_VALIDATE_PARAMETERS in checks:
3340 _CheckOSPList(tbv, osparams.keys())
3342 validate_env = OSCoreEnv(osname, tbv, osparams)
3343 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3344 cwd=tbv.path, reset_env=True)
3346 logging.error("os validate command '%s' returned error: %s output: %s",
3347 result.cmd, result.fail_reason, result.output)
3348 _Fail("OS validation script failed (%s), output: %s",
3349 result.fail_reason, result.output, log=False)
3355 """Demotes the current node from master candidate role.
3358 # try to ensure we're not the master by mistake
3359 master, myself = ssconf.GetMasterAndMyself()
3360 if master == myself:
3361 _Fail("ssconf status shows I'm the master node, will not demote")
3363 result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3364 if not result.failed:
3365 _Fail("The master daemon is running, will not demote")
3368 if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3369 utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3370 except EnvironmentError, err:
3371 if err.errno != errno.ENOENT:
3372 _Fail("Error while backing up cluster file: %s", err, exc=True)
3374 utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3377 def _GetX509Filenames(cryptodir, name):
3378 """Returns the full paths for the private key and certificate.
3381 return (utils.PathJoin(cryptodir, name),
3382 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3383 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3386 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3387 """Creates a new X509 certificate for SSL/TLS.
3390 @param validity: Validity in seconds
3391 @rtype: tuple; (string, string)
3392 @return: Certificate name and public part
3395 (key_pem, cert_pem) = \
3396 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3397 min(validity, _MAX_SSL_CERT_VALIDITY))
3399 cert_dir = tempfile.mkdtemp(dir=cryptodir,
3400 prefix="x509-%s-" % utils.TimestampForFilename())
3402 name = os.path.basename(cert_dir)
3403 assert len(name) > 5
3405 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3407 utils.WriteFile(key_file, mode=0400, data=key_pem)
3408 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3410 # Never return private key as it shouldn't leave the node
3411 return (name, cert_pem)
3413 shutil.rmtree(cert_dir, ignore_errors=True)
3417 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3418 """Removes a X509 certificate.
3421 @param name: Certificate name
3424 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3426 utils.RemoveFile(key_file)
3427 utils.RemoveFile(cert_file)
3431 except EnvironmentError, err:
3432 _Fail("Cannot remove certificate directory '%s': %s",
3436 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3437 """Returns the command for the requested input/output.
3439 @type instance: L{objects.Instance}
3440 @param instance: The instance object
3441 @param mode: Import/export mode
3442 @param ieio: Input/output type
3443 @param ieargs: Input/output arguments
3446 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3453 if ieio == constants.IEIO_FILE:
3454 (filename, ) = ieargs
3456 if not utils.IsNormAbsPath(filename):
3457 _Fail("Path '%s' is not normalized or absolute", filename)
3459 real_filename = os.path.realpath(filename)
3460 directory = os.path.dirname(real_filename)
3462 if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3463 _Fail("File '%s' is not under exports directory '%s': %s",
3464 filename, pathutils.EXPORT_DIR, real_filename)
3467 utils.Makedirs(directory, mode=0750)
3469 quoted_filename = utils.ShellQuote(filename)
3471 if mode == constants.IEM_IMPORT:
3472 suffix = "> %s" % quoted_filename
3473 elif mode == constants.IEM_EXPORT:
3474 suffix = "< %s" % quoted_filename
3476 # Retrieve file size
3478 st = os.stat(filename)
3479 except EnvironmentError, err:
3480 logging.error("Can't stat(2) %s: %s", filename, err)
3482 exp_size = utils.BytesToMebibyte(st.st_size)
3484 elif ieio == constants.IEIO_RAW_DISK:
3487 real_disk = _OpenRealBD(disk)
3489 if mode == constants.IEM_IMPORT:
3490 # we set here a smaller block size as, due to transport buffering, more
3491 # than 64-128k will mostly ignored; we use nocreat to fail if the device
3492 # is not already there or we pass a wrong path; we use notrunc to no
3493 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3494 # much memory; this means that at best, we flush every 64k, which will
3496 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3497 " bs=%s oflag=dsync"),
3501 elif mode == constants.IEM_EXPORT:
3502 # the block size on the read dd is 1MiB to match our units
3503 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3505 str(1024 * 1024), # 1 MB
3507 exp_size = disk.size
3509 elif ieio == constants.IEIO_SCRIPT:
3510 (disk, disk_index, ) = ieargs
3512 assert isinstance(disk_index, (int, long))
3514 real_disk = _OpenRealBD(disk)
3516 inst_os = OSFromDisk(instance.os)
3517 env = OSEnvironment(instance, inst_os)
3519 if mode == constants.IEM_IMPORT:
3520 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3521 env["IMPORT_INDEX"] = str(disk_index)
3522 script = inst_os.import_script
3524 elif mode == constants.IEM_EXPORT:
3525 env["EXPORT_DEVICE"] = real_disk.dev_path
3526 env["EXPORT_INDEX"] = str(disk_index)
3527 script = inst_os.export_script
3529 # TODO: Pass special environment only to script
3530 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3532 if mode == constants.IEM_IMPORT:
3533 suffix = "| %s" % script_cmd
3535 elif mode == constants.IEM_EXPORT:
3536 prefix = "%s |" % script_cmd
3538 # Let script predict size
3539 exp_size = constants.IE_CUSTOM_SIZE
3542 _Fail("Invalid %s I/O mode %r", mode, ieio)
3544 return (env, prefix, suffix, exp_size)
3547 def _CreateImportExportStatusDir(prefix):
3548 """Creates status directory for import/export.
3551 return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3553 (prefix, utils.TimestampForFilename())))
3556 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3558 """Starts an import or export daemon.
3560 @param mode: Import/output mode
3561 @type opts: L{objects.ImportExportOptions}
3562 @param opts: Daemon options
3564 @param host: Remote host for export (None for import)
3566 @param port: Remote port for export (None for import)
3567 @type instance: L{objects.Instance}
3568 @param instance: Instance object
3569 @type component: string
3570 @param component: which part of the instance is transferred now,
3572 @param ieio: Input/output type
3573 @param ieioargs: Input/output arguments
3576 if mode == constants.IEM_IMPORT:
3579 if not (host is None and port is None):
3580 _Fail("Can not specify host or port on import")
3582 elif mode == constants.IEM_EXPORT:
3585 if host is None or port is None:
3586 _Fail("Host and port must be specified for an export")
3589 _Fail("Invalid mode %r", mode)
3591 if (opts.key_name is None) ^ (opts.ca_pem is None):
3592 _Fail("Cluster certificate can only be used for both key and CA")
3594 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3595 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3597 if opts.key_name is None:
3599 key_path = pathutils.NODED_CERT_FILE
3600 cert_path = pathutils.NODED_CERT_FILE
3601 assert opts.ca_pem is None
3603 (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3605 assert opts.ca_pem is not None
3607 for i in [key_path, cert_path]:
3608 if not os.path.exists(i):
3609 _Fail("File '%s' does not exist" % i)
3611 status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3613 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3614 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3615 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3617 if opts.ca_pem is None:
3619 ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3624 utils.WriteFile(ca_file, data=ca, mode=0400)
3627 pathutils.IMPORT_EXPORT_DAEMON,
3629 "--key=%s" % key_path,
3630 "--cert=%s" % cert_path,
3631 "--ca=%s" % ca_file,
3635 cmd.append("--host=%s" % host)
3638 cmd.append("--port=%s" % port)
3641 cmd.append("--ipv6")
3643 cmd.append("--ipv4")
3646 cmd.append("--compress=%s" % opts.compress)
3649 cmd.append("--magic=%s" % opts.magic)
3651 if exp_size is not None:
3652 cmd.append("--expected-size=%s" % exp_size)
3655 cmd.append("--cmd-prefix=%s" % cmd_prefix)
3658 cmd.append("--cmd-suffix=%s" % cmd_suffix)
3660 if mode == constants.IEM_EXPORT:
3661 # Retry connection a few times when connecting to remote peer
3662 cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3663 cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3664 elif opts.connect_timeout is not None:
3665 assert mode == constants.IEM_IMPORT
3666 # Overall timeout for establishing connection while listening
3667 cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3669 logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3671 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3672 # support for receiving a file descriptor for output
3673 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3676 # The import/export name is simply the status directory name
3677 return os.path.basename(status_dir)
3680 shutil.rmtree(status_dir, ignore_errors=True)
3684 def GetImportExportStatus(names):
3685 """Returns import/export daemon status.
3687 @type names: sequence
3688 @param names: List of names
3689 @rtype: List of dicts
3690 @return: Returns a list of the state of each named import/export or None if a
3691 status couldn't be read
3697 status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3701 data = utils.ReadFile(status_file)
3702 except EnvironmentError, err:
3703 if err.errno != errno.ENOENT:
3711 result.append(serializer.LoadJson(data))
3716 def AbortImportExport(name):
3717 """Sends SIGTERM to a running import/export daemon.
3720 logging.info("Abort import/export %s", name)
3722 status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3723 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3726 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3728 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3731 def CleanupImportExport(name):
3732 """Cleanup after an import or export.
3734 If the import/export daemon is still running it's killed. Afterwards the
3735 whole status directory is removed.
3738 logging.info("Finalizing import/export %s", name)
3740 status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3742 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3745 logging.info("Import/export %s is still running with PID %s",
3747 utils.KillProcess(pid, waitpid=False)
3749 shutil.rmtree(status_dir, ignore_errors=True)
3752 def _FindDisks(nodes_ip, disks):
3753 """Sets the physical ID on disks and returns the block devices.
3756 # set the correct physical ID
3757 my_name = netutils.Hostname.GetSysName()
3759 cf.SetPhysicalID(my_name, nodes_ip)
3764 rd = _RecursiveFindBD(cf)
3766 _Fail("Can't find device %s", cf)
3771 def DrbdDisconnectNet(nodes_ip, disks):
3772 """Disconnects the network on a list of drbd devices.
3775 bdevs = _FindDisks(nodes_ip, disks)
3781 except errors.BlockDeviceError, err:
3782 _Fail("Can't change network configuration to standalone mode: %s",
3786 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3787 """Attaches the network on a list of drbd devices.
3790 bdevs = _FindDisks(nodes_ip, disks)
3793 for idx, rd in enumerate(bdevs):
3795 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3796 except EnvironmentError, err:
3797 _Fail("Can't create symlink: %s", err)
3798 # reconnect disks, switch to new master configuration and if
3799 # needed primary mode
3802 rd.AttachNet(multimaster)
3803 except errors.BlockDeviceError, err:
3804 _Fail("Can't change network configuration: %s", err)
3806 # wait until the disks are connected; we need to retry the re-attach
3807 # if the device becomes standalone, as this might happen if the one
3808 # node disconnects and reconnects in a different mode before the
3809 # other node reconnects; in this case, one or both of the nodes will
3810 # decide it has wrong configuration and switch to standalone
3813 all_connected = True
3816 stats = rd.GetProcStatus()
3818 all_connected = (all_connected and
3819 (stats.is_connected or stats.is_in_resync))
3821 if stats.is_standalone:
3822 # peer had different config info and this node became
3823 # standalone, even though this should not happen with the
3824 # new staged way of changing disk configs
3826 rd.AttachNet(multimaster)
3827 except errors.BlockDeviceError, err:
3828 _Fail("Can't change network configuration: %s", err)
3830 if not all_connected:
3831 raise utils.RetryAgain()
3834 # Start with a delay of 100 miliseconds and go up to 5 seconds
3835 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3836 except utils.RetryTimeout:
3837 _Fail("Timeout in disk reconnecting")
3840 # change to primary mode
3844 except errors.BlockDeviceError, err:
3845 _Fail("Can't change to primary mode: %s", err)
3848 def DrbdWaitSync(nodes_ip, disks):
3849 """Wait until DRBDs have synchronized.
3853 stats = rd.GetProcStatus()
3854 if not (stats.is_connected or stats.is_in_resync):
3855 raise utils.RetryAgain()
3858 bdevs = _FindDisks(nodes_ip, disks)
3864 # poll each second for 15 seconds
3865 stats = utils.Retry(_helper, 1, 15, args=[rd])
3866 except utils.RetryTimeout:
3867 stats = rd.GetProcStatus()
3869 if not (stats.is_connected or stats.is_in_resync):
3870 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3871 alldone = alldone and (not stats.is_in_resync)
3872 if stats.sync_percent is not None:
3873 min_resync = min(min_resync, stats.sync_percent)
3875 return (alldone, min_resync)
3878 def GetDrbdUsermodeHelper():
3879 """Returns DRBD usermode helper currently configured.
3883 return drbd.DRBD8.GetUsermodeHelper()
3884 except errors.BlockDeviceError, err:
3888 def PowercycleNode(hypervisor_type):
3889 """Hard-powercycle the node.
3891 Because we need to return first, and schedule the powercycle in the
3892 background, we won't be able to report failures nicely.
3895 hyper = hypervisor.GetHypervisor(hypervisor_type)
3899 # if we can't fork, we'll pretend that we're in the child process
3902 return "Reboot scheduled in 5 seconds"
3903 # ensure the child is running on ram
3906 except Exception: # pylint: disable=W0703
3909 hyper.PowercycleNode()
3912 def _VerifyRestrictedCmdName(cmd):
3913 """Verifies a restricted command name.
3916 @param cmd: Command name
3917 @rtype: tuple; (boolean, string or None)
3918 @return: The tuple's first element is the status; if C{False}, the second
3919 element is an error message string, otherwise it's C{None}
3923 return (False, "Missing command name")
3925 if os.path.basename(cmd) != cmd:
3926 return (False, "Invalid command name")
3928 if not constants.EXT_PLUGIN_MASK.match(cmd):
3929 return (False, "Command name contains forbidden characters")
3934 def _CommonRestrictedCmdCheck(path, owner):
3935 """Common checks for restricted command file system directories and files.
3938 @param path: Path to check
3939 @param owner: C{None} or tuple containing UID and GID
3940 @rtype: tuple; (boolean, string or C{os.stat} result)
3941 @return: The tuple's first element is the status; if C{False}, the second
3942 element is an error message string, otherwise it's the result of C{os.stat}
3946 # Default to root as owner
3951 except EnvironmentError, err:
3952 return (False, "Can't stat(2) '%s': %s" % (path, err))
3954 if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3955 return (False, "Permissions on '%s' are too permissive" % path)
3957 if (st.st_uid, st.st_gid) != owner:
3958 (owner_uid, owner_gid) = owner
3959 return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3964 def _VerifyRestrictedCmdDirectory(path, _owner=None):
3965 """Verifies restricted command directory.
3968 @param path: Path to check
3969 @rtype: tuple; (boolean, string or None)
3970 @return: The tuple's first element is the status; if C{False}, the second
3971 element is an error message string, otherwise it's C{None}
3974 (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3977 return (False, value)
3979 if not stat.S_ISDIR(value.st_mode):
3980 return (False, "Path '%s' is not a directory" % path)
3985 def _VerifyRestrictedCmd(path, cmd, _owner=None):
3986 """Verifies a whole restricted command and returns its executable filename.
3989 @param path: Directory containing restricted commands
3991 @param cmd: Command name
3992 @rtype: tuple; (boolean, string)
3993 @return: The tuple's first element is the status; if C{False}, the second
3994 element is an error message string, otherwise the second element is the
3995 absolute path to the executable
3998 executable = utils.PathJoin(path, cmd)
4000 (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
4005 if not utils.IsExecutable(executable):
4006 return (False, "access(2) thinks '%s' can't be executed" % executable)
4008 return (True, executable)
4011 def _PrepareRestrictedCmd(path, cmd,
4012 _verify_dir=_VerifyRestrictedCmdDirectory,
4013 _verify_name=_VerifyRestrictedCmdName,
4014 _verify_cmd=_VerifyRestrictedCmd):
4015 """Performs a number of tests on a restricted command.
4018 @param path: Directory containing restricted commands
4020 @param cmd: Command name
4021 @return: Same as L{_VerifyRestrictedCmd}
4024 # Verify the directory first
4025 (status, msg) = _verify_dir(path)
4027 # Check command if everything was alright
4028 (status, msg) = _verify_name(cmd)
4033 # Check actual executable
4034 return _verify_cmd(path, cmd)
4037 def RunRestrictedCmd(cmd,
4038 _lock_timeout=_RCMD_LOCK_TIMEOUT,
4039 _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
4040 _path=pathutils.RESTRICTED_COMMANDS_DIR,
4041 _sleep_fn=time.sleep,
4042 _prepare_fn=_PrepareRestrictedCmd,
4043 _runcmd_fn=utils.RunCmd,
4044 _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
4045 """Executes a restricted command after performing strict tests.
4048 @param cmd: Command name
4050 @return: Command output
4051 @raise RPCFail: In case of an error
4054 logging.info("Preparing to run restricted command '%s'", cmd)
4057 _Fail("Restricted commands disabled at configure time")
4063 lock = utils.FileLock.Open(_lock_file)
4064 lock.Exclusive(blocking=True, timeout=_lock_timeout)
4066 (status, value) = _prepare_fn(_path, cmd)
4069 cmdresult = _runcmd_fn([value], env={}, reset_env=True,
4070 postfork_fn=lambda _: lock.Unlock())
4072 logging.error(value)
4073 except Exception: # pylint: disable=W0703
4074 # Keep original error in log
4075 logging.exception("Caught exception")
4077 if cmdresult is None:
4078 logging.info("Sleeping for %0.1f seconds before returning",
4079 _RCMD_INVALID_DELAY)
4080 _sleep_fn(_RCMD_INVALID_DELAY)
4082 # Do not include original error message in returned error
4083 _Fail("Executing command '%s' failed" % cmd)
4084 elif cmdresult.failed or cmdresult.fail_reason:
4085 _Fail("Restricted command '%s' failed: %s; output: %s",
4086 cmd, cmdresult.fail_reason, cmdresult.output)
4088 return cmdresult.output
4090 if lock is not None:
4091 # Release lock at last
4096 def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
4097 """Creates or removes the watcher pause file.
4099 @type until: None or number
4100 @param until: Unix timestamp saying until when the watcher shouldn't run
4104 logging.info("Received request to no longer pause watcher")
4105 utils.RemoveFile(_filename)
4107 logging.info("Received request to pause watcher until %s", until)
4109 if not ht.TNumber(until):
4110 _Fail("Duration must be numeric")
4112 utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
4115 class HooksRunner(object):
4118 This class is instantiated on the node side (ganeti-noded) and not
4122 def __init__(self, hooks_base_dir=None):
4123 """Constructor for hooks runner.
4125 @type hooks_base_dir: str or None
4126 @param hooks_base_dir: if not None, this overrides the
4127 L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
4130 if hooks_base_dir is None:
4131 hooks_base_dir = pathutils.HOOKS_BASE_DIR
4132 # yeah, _BASE_DIR is not valid for attributes, we use it like a
4134 self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
4136 def RunLocalHooks(self, node_list, hpath, phase, env):
4137 """Check that the hooks will be run only locally and then run them.
4140 assert len(node_list) == 1
4142 _, myself = ssconf.GetMasterAndMyself()
4143 assert node == myself
4145 results = self.RunHooks(hpath, phase, env)
4147 # Return values in the form expected by HooksMaster
4148 return {node: (None, False, results)}
4150 def RunHooks(self, hpath, phase, env):
4151 """Run the scripts in the hooks directory.
4154 @param hpath: the path to the hooks directory which
4157 @param phase: either L{constants.HOOKS_PHASE_PRE} or
4158 L{constants.HOOKS_PHASE_POST}
4160 @param env: dictionary with the environment for the hook
4162 @return: list of 3-element tuples:
4164 - script result, either L{constants.HKR_SUCCESS} or
4165 L{constants.HKR_FAIL}
4166 - output of the script
4168 @raise errors.ProgrammerError: for invalid input
4172 if phase == constants.HOOKS_PHASE_PRE:
4174 elif phase == constants.HOOKS_PHASE_POST:
4177 _Fail("Unknown hooks phase '%s'", phase)
4179 subdir = "%s-%s.d" % (hpath, suffix)
4180 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
4184 if not os.path.isdir(dir_name):
4185 # for non-existing/non-dirs, we simply exit instead of logging a
4186 # warning at every operation
4189 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4191 for (relname, relstatus, runresult) in runparts_results:
4192 if relstatus == constants.RUNPARTS_SKIP:
4193 rrval = constants.HKR_SKIP
4195 elif relstatus == constants.RUNPARTS_ERR:
4196 rrval = constants.HKR_FAIL
4197 output = "Hook script execution error: %s" % runresult
4198 elif relstatus == constants.RUNPARTS_RUN:
4199 if runresult.failed:
4200 rrval = constants.HKR_FAIL
4202 rrval = constants.HKR_SUCCESS
4203 output = utils.SafeEncode(runresult.output.strip())
4204 results.append(("%s/%s" % (subdir, relname), rrval, output))
4209 class IAllocatorRunner(object):
4210 """IAllocator runner.
4212 This class is instantiated on the node side (ganeti-noded) and not on
4217 def Run(name, idata):
4218 """Run an iallocator script.
4221 @param name: the iallocator script name
4223 @param idata: the allocator input data
4226 @return: two element tuple of:
4228 - either error message or stdout of allocator (for success)
4231 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4233 if alloc_script is None:
4234 _Fail("iallocator module '%s' not found in the search path", name)
4236 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4240 result = utils.RunCmd([alloc_script, fin_name])
4242 _Fail("iallocator module '%s' failed: %s, output '%s'",
4243 name, result.fail_reason, result.output)
4247 return result.stdout
4250 class DevCacheManager(object):
4251 """Simple class for managing a cache of block device information.
4254 _DEV_PREFIX = "/dev/"
4255 _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4258 def _ConvertPath(cls, dev_path):
4259 """Converts a /dev/name path to the cache file name.
4261 This replaces slashes with underscores and strips the /dev
4262 prefix. It then returns the full path to the cache file.
4265 @param dev_path: the C{/dev/} path name
4267 @return: the converted path name
4270 if dev_path.startswith(cls._DEV_PREFIX):
4271 dev_path = dev_path[len(cls._DEV_PREFIX):]
4272 dev_path = dev_path.replace("/", "_")
4273 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4277 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4278 """Updates the cache information for a given device.
4281 @param dev_path: the pathname of the device
4283 @param owner: the owner (instance name) of the device
4284 @type on_primary: bool
4285 @param on_primary: whether this is the primary
4288 @param iv_name: the instance-visible name of the
4289 device, as in objects.Disk.iv_name
4294 if dev_path is None:
4295 logging.error("DevCacheManager.UpdateCache got a None dev_path")
4297 fpath = cls._ConvertPath(dev_path)
4303 iv_name = "not_visible"
4304 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4306 utils.WriteFile(fpath, data=fdata)
4307 except EnvironmentError, err:
4308 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4311 def RemoveCache(cls, dev_path):
4312 """Remove data for a dev_path.
4314 This is just a wrapper over L{utils.io.RemoveFile} with a converted
4315 path name and logging.
4318 @param dev_path: the pathname of the device
4323 if dev_path is None:
4324 logging.error("DevCacheManager.RemoveCache got a None dev_path")
4326 fpath = cls._ConvertPath(dev_path)
4328 utils.RemoveFile(fpath)
4329 except EnvironmentError, err:
4330 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)