4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
69 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70 _ALLOWED_CLEAN_DIRS = frozenset([
72 pathutils.JOB_QUEUE_ARCHIVE_DIR,
74 pathutils.CRYPTO_KEYS_DIR,
76 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77 _X509_KEY_FILE = "key"
78 _X509_CERT_FILE = "cert"
79 _IES_STATUS_FILE = "status"
83 #: Valid LVS output line regex
84 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
86 # Actions for the master setup script
87 _MASTER_START = "start"
91 class RPCFail(Exception):
92 """Class denoting RPC failure.
94 Its argument is the error message.
99 def _Fail(msg, *args, **kwargs):
100 """Log an error and the raise an RPCFail exception.
102 This exception is then handled specially in the ganeti daemon and
103 turned into a 'failed' return type. As such, this function is a
104 useful shortcut for logging the error and returning it to the master
108 @param msg: the text of the exception
114 if "log" not in kwargs or kwargs["log"]: # if we should log this error
115 if "exc" in kwargs and kwargs["exc"]:
116 logging.exception(msg)
123 """Simple wrapper to return a SimpleStore.
125 @rtype: L{ssconf.SimpleStore}
126 @return: a SimpleStore instance
129 return ssconf.SimpleStore()
132 def _GetSshRunner(cluster_name):
133 """Simple wrapper to return an SshRunner.
135 @type cluster_name: str
136 @param cluster_name: the cluster name, which is needed
137 by the SshRunner constructor
138 @rtype: L{ssh.SshRunner}
139 @return: an SshRunner instance
142 return ssh.SshRunner(cluster_name)
145 def _Decompress(data):
146 """Unpacks data compressed by the RPC client.
148 @type data: list or tuple
149 @param data: Data sent by RPC client
151 @return: Decompressed data
154 assert isinstance(data, (list, tuple))
155 assert len(data) == 2
156 (encoding, content) = data
157 if encoding == constants.RPC_ENCODING_NONE:
159 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
160 return zlib.decompress(base64.b64decode(content))
162 raise AssertionError("Unknown data encoding")
165 def _CleanDirectory(path, exclude=None):
166 """Removes all regular files in a directory.
169 @param path: the directory to clean
171 @param exclude: list of files to be excluded, defaults
175 if path not in _ALLOWED_CLEAN_DIRS:
176 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
179 if not os.path.isdir(path):
184 # Normalize excluded paths
185 exclude = [os.path.normpath(i) for i in exclude]
187 for rel_name in utils.ListVisibleFiles(path):
188 full_name = utils.PathJoin(path, rel_name)
189 if full_name in exclude:
191 if os.path.isfile(full_name) and not os.path.islink(full_name):
192 utils.RemoveFile(full_name)
195 def _BuildUploadFileList():
196 """Build the list of allowed upload files.
198 This is abstracted so that it's built only once at module import time.
201 allowed_files = set([
202 pathutils.CLUSTER_CONF_FILE,
204 pathutils.SSH_KNOWN_HOSTS_FILE,
205 pathutils.VNC_PASSWORD_FILE,
206 pathutils.RAPI_CERT_FILE,
207 pathutils.SPICE_CERT_FILE,
208 pathutils.SPICE_CACERT_FILE,
209 pathutils.RAPI_USERS_FILE,
210 pathutils.CONFD_HMAC_KEY,
211 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
214 for hv_name in constants.HYPER_TYPES:
215 hv_class = hypervisor.GetHypervisorClass(hv_name)
216 allowed_files.update(hv_class.GetAncillaryFiles()[0])
218 assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
219 "Allowed file storage paths should never be uploaded via RPC"
221 return frozenset(allowed_files)
224 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
228 """Removes job queue files and archived jobs.
234 _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
235 _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
239 """Returns master information.
241 This is an utility function to compute master information, either
242 for consumption here or from the node daemon.
245 @return: master_netdev, master_ip, master_name, primary_ip_family,
247 @raise RPCFail: in case of errors
252 master_netdev = cfg.GetMasterNetdev()
253 master_ip = cfg.GetMasterIP()
254 master_netmask = cfg.GetMasterNetmask()
255 master_node = cfg.GetMasterNode()
256 primary_ip_family = cfg.GetPrimaryIPFamily()
257 except errors.ConfigurationError, err:
258 _Fail("Cluster configuration incomplete: %s", err, exc=True)
259 return (master_netdev, master_ip, master_node, primary_ip_family,
263 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
264 """Decorator that runs hooks before and after the decorated function.
266 @type hook_opcode: string
267 @param hook_opcode: opcode of the hook
268 @type hooks_path: string
269 @param hooks_path: path of the hooks
270 @type env_builder_fn: function
271 @param env_builder_fn: function that returns a dictionary containing the
272 environment variables for the hooks. Will get all the parameters of the
274 @raise RPCFail: in case of pre-hook failure
278 def wrapper(*args, **kwargs):
279 _, myself = ssconf.GetMasterAndMyself()
280 nodes = ([myself], [myself]) # these hooks run locally
282 env_fn = compat.partial(env_builder_fn, *args, **kwargs)
286 hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
287 None, env_fn, logging.warning, cfg.GetClusterName(),
290 hm.RunPhase(constants.HOOKS_PHASE_PRE)
291 result = fn(*args, **kwargs)
292 hm.RunPhase(constants.HOOKS_PHASE_POST)
299 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
300 """Builds environment variables for master IP hooks.
302 @type master_params: L{objects.MasterNetworkParameters}
303 @param master_params: network parameters of the master
304 @type use_external_mip_script: boolean
305 @param use_external_mip_script: whether to use an external master IP
306 address setup script (unused, but necessary per the implementation of the
307 _RunLocalHooks decorator)
310 # pylint: disable=W0613
311 ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
313 "MASTER_NETDEV": master_params.netdev,
314 "MASTER_IP": master_params.ip,
315 "MASTER_NETMASK": str(master_params.netmask),
316 "CLUSTER_IP_VERSION": str(ver),
322 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
323 """Execute the master IP address setup script.
325 @type master_params: L{objects.MasterNetworkParameters}
326 @param master_params: network parameters of the master
328 @param action: action to pass to the script. Must be one of
329 L{backend._MASTER_START} or L{backend._MASTER_STOP}
330 @type use_external_mip_script: boolean
331 @param use_external_mip_script: whether to use an external master IP
333 @raise backend.RPCFail: if there are errors during the execution of the
337 env = _BuildMasterIpEnv(master_params)
339 if use_external_mip_script:
340 setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
342 setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
344 result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
347 _Fail("Failed to %s the master IP. Script return value: %s" %
348 (action, result.exit_code), log=True)
351 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
353 def ActivateMasterIp(master_params, use_external_mip_script):
354 """Activate the IP address of the master daemon.
356 @type master_params: L{objects.MasterNetworkParameters}
357 @param master_params: network parameters of the master
358 @type use_external_mip_script: boolean
359 @param use_external_mip_script: whether to use an external master IP
361 @raise RPCFail: in case of errors during the IP startup
364 _RunMasterSetupScript(master_params, _MASTER_START,
365 use_external_mip_script)
368 def StartMasterDaemons(no_voting):
369 """Activate local node as master node.
371 The function will start the master daemons (ganeti-masterd and ganeti-rapi).
373 @type no_voting: boolean
374 @param no_voting: whether to start ganeti-masterd without a node vote
375 but still non-interactively
381 masterd_args = "--no-voting --yes-do-it"
386 "EXTRA_MASTERD_ARGS": masterd_args,
389 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
391 msg = "Can't start Ganeti master: %s" % result.output
396 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
398 def DeactivateMasterIp(master_params, use_external_mip_script):
399 """Deactivate the master IP on this node.
401 @type master_params: L{objects.MasterNetworkParameters}
402 @param master_params: network parameters of the master
403 @type use_external_mip_script: boolean
404 @param use_external_mip_script: whether to use an external master IP
406 @raise RPCFail: in case of errors during the IP turndown
409 _RunMasterSetupScript(master_params, _MASTER_STOP,
410 use_external_mip_script)
413 def StopMasterDaemons():
414 """Stop the master daemons on this node.
416 Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
421 # TODO: log and report back to the caller the error failures; we
422 # need to decide in which case we fail the RPC for this
424 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
426 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
428 result.cmd, result.exit_code, result.output)
431 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
432 """Change the netmask of the master IP.
434 @param old_netmask: the old value of the netmask
435 @param netmask: the new value of the netmask
436 @param master_ip: the master IP
437 @param master_netdev: the master network device
440 if old_netmask == netmask:
443 if not netutils.IPAddress.Own(master_ip):
444 _Fail("The master IP address is not up, not attempting to change its"
447 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
448 "%s/%s" % (master_ip, netmask),
449 "dev", master_netdev, "label",
450 "%s:0" % master_netdev])
452 _Fail("Could not set the new netmask on the master IP address")
454 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
455 "%s/%s" % (master_ip, old_netmask),
456 "dev", master_netdev, "label",
457 "%s:0" % master_netdev])
459 _Fail("Could not bring down the master IP address with the old netmask")
462 def EtcHostsModify(mode, host, ip):
463 """Modify a host entry in /etc/hosts.
465 @param mode: The mode to operate. Either add or remove entry
466 @param host: The host to operate on
467 @param ip: The ip associated with the entry
470 if mode == constants.ETC_HOSTS_ADD:
472 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
474 utils.AddHostToEtcHosts(host, ip)
475 elif mode == constants.ETC_HOSTS_REMOVE:
477 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
478 " parameter is present")
479 utils.RemoveHostFromEtcHosts(host)
481 RPCFail("Mode not supported")
484 def LeaveCluster(modify_ssh_setup):
485 """Cleans up and remove the current node.
487 This function cleans up and prepares the current node to be removed
490 If processing is successful, then it raises an
491 L{errors.QuitGanetiException} which is used as a special case to
492 shutdown the node daemon.
494 @param modify_ssh_setup: boolean
497 _CleanDirectory(pathutils.DATA_DIR)
498 _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
503 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
505 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
507 utils.RemoveFile(priv_key)
508 utils.RemoveFile(pub_key)
509 except errors.OpExecError:
510 logging.exception("Error while processing ssh files")
513 utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
514 utils.RemoveFile(pathutils.RAPI_CERT_FILE)
515 utils.RemoveFile(pathutils.SPICE_CERT_FILE)
516 utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
517 utils.RemoveFile(pathutils.NODED_CERT_FILE)
518 except: # pylint: disable=W0702
519 logging.exception("Error while removing cluster secrets")
521 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
523 logging.error("Command %s failed with exitcode %s and error %s",
524 result.cmd, result.exit_code, result.output)
526 # Raise a custom exception (handled in ganeti-noded)
527 raise errors.QuitGanetiException(True, "Shutdown scheduled")
530 def _GetVgInfo(name):
531 """Retrieves information about a LVM volume group.
534 # TODO: GetVGInfo supports returning information for multiple VGs at once
535 vginfo = bdev.LogicalVolume.GetVGInfo([name])
537 vg_free = int(round(vginfo[0][0], 0))
538 vg_size = int(round(vginfo[0][1], 0))
550 def _GetHvInfo(name):
551 """Retrieves node information from a hypervisor.
553 The information returned depends on the hypervisor. Common items:
555 - vg_size is the size of the configured volume group in MiB
556 - vg_free is the free size of the volume group in MiB
557 - memory_dom0 is the memory allocated for domain0 in MiB
558 - memory_free is the currently available (free) ram in MiB
559 - memory_total is the total number of ram in MiB
560 - hv_version: the hypervisor version, if available
563 return hypervisor.GetHypervisor(name).GetNodeInfo()
566 def _GetNamedNodeInfo(names, fn):
567 """Calls C{fn} for all names in C{names} and returns a dictionary.
575 return map(fn, names)
578 def GetNodeInfo(vg_names, hv_names):
579 """Gives back a hash with different information about the node.
581 @type vg_names: list of string
582 @param vg_names: Names of the volume groups to ask for disk space information
583 @type hv_names: list of string
584 @param hv_names: Names of the hypervisors to ask for node information
585 @rtype: tuple; (string, None/dict, None/dict)
586 @return: Tuple containing boot ID, volume group information and hypervisor
590 bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
591 vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
592 hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
594 return (bootid, vg_info, hv_info)
597 def VerifyNode(what, cluster_name):
598 """Verify the status of the local node.
600 Based on the input L{what} parameter, various checks are done on the
603 If the I{filelist} key is present, this list of
604 files is checksummed and the file/checksum pairs are returned.
606 If the I{nodelist} key is present, we check that we have
607 connectivity via ssh with the target nodes (and check the hostname
610 If the I{node-net-test} key is present, we check that we have
611 connectivity to the given nodes via both primary IP and, if
612 applicable, secondary IPs.
615 @param what: a dictionary of things to check:
616 - filelist: list of files for which to compute checksums
617 - nodelist: list of nodes we should check ssh communication with
618 - node-net-test: list of nodes we should check node daemon port
620 - hypervisor: list with hypervisors to run the verify for
622 @return: a dictionary with the same keys as the input dict, and
623 values representing the result of the checks
627 my_name = netutils.Hostname.GetSysName()
628 port = netutils.GetDaemonPort(constants.NODED)
629 vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
631 if constants.NV_HYPERVISOR in what and vm_capable:
632 result[constants.NV_HYPERVISOR] = tmp = {}
633 for hv_name in what[constants.NV_HYPERVISOR]:
635 val = hypervisor.GetHypervisor(hv_name).Verify()
636 except errors.HypervisorError, err:
637 val = "Error while checking hypervisor: %s" % str(err)
640 if constants.NV_HVPARAMS in what and vm_capable:
641 result[constants.NV_HVPARAMS] = tmp = []
642 for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
644 logging.info("Validating hv %s, %s", hv_name, hvparms)
645 hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
646 except errors.HypervisorError, err:
647 tmp.append((source, hv_name, str(err)))
649 if constants.NV_FILELIST in what:
650 fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
651 what[constants.NV_FILELIST]))
652 result[constants.NV_FILELIST] = \
653 dict((vcluster.MakeVirtualPath(key), value)
654 for (key, value) in fingerprints.items())
656 if constants.NV_NODELIST in what:
657 (nodes, bynode) = what[constants.NV_NODELIST]
659 # Add nodes from other groups (different for each node)
661 nodes.extend(bynode[my_name])
666 random.shuffle(nodes)
668 # Try to contact all nodes
671 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
675 result[constants.NV_NODELIST] = val
677 if constants.NV_NODENETTEST in what:
678 result[constants.NV_NODENETTEST] = tmp = {}
679 my_pip = my_sip = None
680 for name, pip, sip in what[constants.NV_NODENETTEST]:
686 tmp[my_name] = ("Can't find my own primary/secondary IP"
689 for name, pip, sip in what[constants.NV_NODENETTEST]:
691 if not netutils.TcpPing(pip, port, source=my_pip):
692 fail.append("primary")
694 if not netutils.TcpPing(sip, port, source=my_sip):
695 fail.append("secondary")
697 tmp[name] = ("failure using the %s interface(s)" %
700 if constants.NV_MASTERIP in what:
701 # FIXME: add checks on incoming data structures (here and in the
702 # rest of the function)
703 master_name, master_ip = what[constants.NV_MASTERIP]
704 if master_name == my_name:
705 source = constants.IP4_ADDRESS_LOCALHOST
708 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
711 if constants.NV_USERSCRIPTS in what:
712 result[constants.NV_USERSCRIPTS] = \
713 [script for script in what[constants.NV_USERSCRIPTS]
714 if not (os.path.exists(script) and os.access(script, os.X_OK))]
716 if constants.NV_OOB_PATHS in what:
717 result[constants.NV_OOB_PATHS] = tmp = []
718 for path in what[constants.NV_OOB_PATHS]:
722 tmp.append("error stating out of band helper: %s" % err)
724 if stat.S_ISREG(st.st_mode):
725 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
728 tmp.append("out of band helper %s is not executable" % path)
730 tmp.append("out of band helper %s is not a file" % path)
732 if constants.NV_LVLIST in what and vm_capable:
734 val = GetVolumeList(utils.ListVolumeGroups().keys())
737 result[constants.NV_LVLIST] = val
739 if constants.NV_INSTANCELIST in what and vm_capable:
740 # GetInstanceList can fail
742 val = GetInstanceList(what[constants.NV_INSTANCELIST])
745 result[constants.NV_INSTANCELIST] = val
747 if constants.NV_VGLIST in what and vm_capable:
748 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
750 if constants.NV_PVLIST in what and vm_capable:
751 result[constants.NV_PVLIST] = \
752 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
753 filter_allocatable=False)
755 if constants.NV_VERSION in what:
756 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
757 constants.RELEASE_VERSION)
759 if constants.NV_HVINFO in what and vm_capable:
760 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
761 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
763 if constants.NV_DRBDLIST in what and vm_capable:
765 used_minors = bdev.DRBD8.GetUsedDevs().keys()
766 except errors.BlockDeviceError, err:
767 logging.warning("Can't get used minors list", exc_info=True)
768 used_minors = str(err)
769 result[constants.NV_DRBDLIST] = used_minors
771 if constants.NV_DRBDHELPER in what and vm_capable:
774 payload = bdev.BaseDRBD.GetUsermodeHelper()
775 except errors.BlockDeviceError, err:
776 logging.error("Can't get DRBD usermode helper: %s", str(err))
779 result[constants.NV_DRBDHELPER] = (status, payload)
781 if constants.NV_NODESETUP in what:
782 result[constants.NV_NODESETUP] = tmpr = []
783 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
784 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
785 " under /sys, missing required directories /sys/block"
786 " and /sys/class/net")
787 if (not os.path.isdir("/proc/sys") or
788 not os.path.isfile("/proc/sysrq-trigger")):
789 tmpr.append("The procfs filesystem doesn't seem to be mounted"
790 " under /proc, missing required directory /proc/sys and"
791 " the file /proc/sysrq-trigger")
793 if constants.NV_TIME in what:
794 result[constants.NV_TIME] = utils.SplitTime(time.time())
796 if constants.NV_OSLIST in what and vm_capable:
797 result[constants.NV_OSLIST] = DiagnoseOS()
799 if constants.NV_BRIDGES in what and vm_capable:
800 result[constants.NV_BRIDGES] = [bridge
801 for bridge in what[constants.NV_BRIDGES]
802 if not utils.BridgeExists(bridge)]
806 def GetBlockDevSizes(devices):
807 """Return the size of the given block devices
810 @param devices: list of block device nodes to query
813 dictionary of all block devices under /dev (key). The value is their
816 {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
822 for devpath in devices:
823 if not utils.IsBelowDir(DEV_PREFIX, devpath):
827 st = os.stat(devpath)
828 except EnvironmentError, err:
829 logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
832 if stat.S_ISBLK(st.st_mode):
833 result = utils.RunCmd(["blockdev", "--getsize64", devpath])
835 # We don't want to fail, just do not list this device as available
836 logging.warning("Cannot get size for block device %s", devpath)
839 size = int(result.stdout) / (1024 * 1024)
840 blockdevs[devpath] = size
844 def GetVolumeList(vg_names):
845 """Compute list of logical volumes and their size.
848 @param vg_names: the volume groups whose LVs we should list, or
849 empty for all volume groups
852 dictionary of all partions (key) with value being a tuple of
853 their size (in MiB), inactive and online status::
855 {'xenvg/test1': ('20.06', True, True)}
857 in case of errors, a string is returned with the error
865 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
866 "--separator=%s" % sep,
867 "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
869 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
871 for line in result.stdout.splitlines():
873 match = _LVSLINE_REGEX.match(line)
875 logging.error("Invalid line returned from lvs output: '%s'", line)
877 vg_name, name, size, attr = match.groups()
878 inactive = attr[4] == "-"
879 online = attr[5] == "o"
880 virtual = attr[0] == "v"
882 # we don't want to report such volumes as existing, since they
883 # don't really hold data
885 lvs[vg_name + "/" + name] = (size, inactive, online)
890 def ListVolumeGroups():
891 """List the volume groups and their size.
894 @return: dictionary with keys volume name and values the
898 return utils.ListVolumeGroups()
902 """List all volumes on this node.
906 A list of dictionaries, each having four keys:
907 - name: the logical volume name,
908 - size: the size of the logical volume
909 - dev: the physical device on which the LV lives
910 - vg: the volume group to which it belongs
912 In case of errors, we return an empty list and log the
915 Note that since a logical volume can live on multiple physical
916 volumes, the resulting list might include a logical volume
920 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
922 "--options=lv_name,lv_size,devices,vg_name"])
924 _Fail("Failed to list logical volumes, lvs output: %s",
928 return dev.split("(")[0]
931 return [parse_dev(x) for x in dev.split(",")]
934 line = [v.strip() for v in line]
935 return [{"name": line[0], "size": line[1],
936 "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
939 for line in result.stdout.splitlines():
940 if line.count("|") >= 3:
941 all_devs.extend(map_line(line.split("|")))
943 logging.warning("Strange line in the output from lvs: '%s'", line)
947 def BridgesExist(bridges_list):
948 """Check if a list of bridges exist on the current node.
951 @return: C{True} if all of them exist, C{False} otherwise
955 for bridge in bridges_list:
956 if not utils.BridgeExists(bridge):
957 missing.append(bridge)
960 _Fail("Missing bridges %s", utils.CommaJoin(missing))
963 def GetInstanceList(hypervisor_list):
964 """Provides a list of instances.
966 @type hypervisor_list: list
967 @param hypervisor_list: the list of hypervisors to query information
970 @return: a list of all running instances on the current node
971 - instance1.example.com
972 - instance2.example.com
976 for hname in hypervisor_list:
978 names = hypervisor.GetHypervisor(hname).ListInstances()
979 results.extend(names)
980 except errors.HypervisorError, err:
981 _Fail("Error enumerating instances (hypervisor %s): %s",
982 hname, err, exc=True)
987 def GetInstanceInfo(instance, hname):
988 """Gives back the information about an instance as a dictionary.
990 @type instance: string
991 @param instance: the instance name
993 @param hname: the hypervisor type of the instance
996 @return: dictionary with the following keys:
997 - memory: memory size of instance (int)
998 - state: xen state of instance (string)
999 - time: cpu time of instance (float)
1000 - vcpus: the number of vcpus (int)
1005 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1006 if iinfo is not None:
1007 output["memory"] = iinfo[2]
1008 output["vcpus"] = iinfo[3]
1009 output["state"] = iinfo[4]
1010 output["time"] = iinfo[5]
1015 def GetInstanceMigratable(instance):
1016 """Gives whether an instance can be migrated.
1018 @type instance: L{objects.Instance}
1019 @param instance: object representing the instance to be checked.
1022 @return: tuple of (result, description) where:
1023 - result: whether the instance can be migrated or not
1024 - description: a description of the issue, if relevant
1027 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1028 iname = instance.name
1029 if iname not in hyper.ListInstances():
1030 _Fail("Instance %s is not running", iname)
1032 for idx in range(len(instance.disks)):
1033 link_name = _GetBlockDevSymlinkPath(iname, idx)
1034 if not os.path.islink(link_name):
1035 logging.warning("Instance %s is missing symlink %s for disk %d",
1036 iname, link_name, idx)
1039 def GetAllInstancesInfo(hypervisor_list):
1040 """Gather data about all instances.
1042 This is the equivalent of L{GetInstanceInfo}, except that it
1043 computes data for all instances at once, thus being faster if one
1044 needs data about more than one instance.
1046 @type hypervisor_list: list
1047 @param hypervisor_list: list of hypervisors to query for instance data
1050 @return: dictionary of instance: data, with data having the following keys:
1051 - memory: memory size of instance (int)
1052 - state: xen state of instance (string)
1053 - time: cpu time of instance (float)
1054 - vcpus: the number of vcpus
1059 for hname in hypervisor_list:
1060 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1062 for name, _, memory, vcpus, state, times in iinfo:
1070 # we only check static parameters, like memory and vcpus,
1071 # and not state and time which can change between the
1072 # invocations of the different hypervisors
1073 for key in "memory", "vcpus":
1074 if value[key] != output[name][key]:
1075 _Fail("Instance %s is running twice"
1076 " with different parameters", name)
1077 output[name] = value
1082 def _InstanceLogName(kind, os_name, instance, component):
1083 """Compute the OS log filename for a given instance and operation.
1085 The instance name and os name are passed in as strings since not all
1086 operations have these as part of an instance object.
1089 @param kind: the operation type (e.g. add, import, etc.)
1090 @type os_name: string
1091 @param os_name: the os name
1092 @type instance: string
1093 @param instance: the name of the instance being imported/added/etc.
1094 @type component: string or None
1095 @param component: the name of the component of the instance being
1099 # TODO: Use tempfile.mkstemp to create unique filename
1101 assert "/" not in component
1102 c_msg = "-%s" % component
1105 base = ("%s-%s-%s%s-%s.log" %
1106 (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1107 return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1110 def InstanceOsAdd(instance, reinstall, debug):
1111 """Add an OS to an instance.
1113 @type instance: L{objects.Instance}
1114 @param instance: Instance whose OS is to be installed
1115 @type reinstall: boolean
1116 @param reinstall: whether this is an instance reinstall
1117 @type debug: integer
1118 @param debug: debug level, passed to the OS scripts
1122 inst_os = OSFromDisk(instance.os)
1124 create_env = OSEnvironment(instance, inst_os, debug)
1126 create_env["INSTANCE_REINSTALL"] = "1"
1128 logfile = _InstanceLogName("add", instance.os, instance.name, None)
1130 result = utils.RunCmd([inst_os.create_script], env=create_env,
1131 cwd=inst_os.path, output=logfile, reset_env=True)
1133 logging.error("os create command '%s' returned error: %s, logfile: %s,"
1134 " output: %s", result.cmd, result.fail_reason, logfile,
1136 lines = [utils.SafeEncode(val)
1137 for val in utils.TailFile(logfile, lines=20)]
1138 _Fail("OS create script failed (%s), last lines in the"
1139 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1142 def RunRenameInstance(instance, old_name, debug):
1143 """Run the OS rename script for an instance.
1145 @type instance: L{objects.Instance}
1146 @param instance: Instance whose OS is to be installed
1147 @type old_name: string
1148 @param old_name: previous instance name
1149 @type debug: integer
1150 @param debug: debug level, passed to the OS scripts
1152 @return: the success of the operation
1155 inst_os = OSFromDisk(instance.os)
1157 rename_env = OSEnvironment(instance, inst_os, debug)
1158 rename_env["OLD_INSTANCE_NAME"] = old_name
1160 logfile = _InstanceLogName("rename", instance.os,
1161 "%s-%s" % (old_name, instance.name), None)
1163 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1164 cwd=inst_os.path, output=logfile, reset_env=True)
1167 logging.error("os create command '%s' returned error: %s output: %s",
1168 result.cmd, result.fail_reason, result.output)
1169 lines = [utils.SafeEncode(val)
1170 for val in utils.TailFile(logfile, lines=20)]
1171 _Fail("OS rename script failed (%s), last lines in the"
1172 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1175 def _GetBlockDevSymlinkPath(instance_name, idx):
1176 return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1177 (instance_name, constants.DISK_SEPARATOR, idx))
1180 def _SymlinkBlockDev(instance_name, device_path, idx):
1181 """Set up symlinks to a instance's block device.
1183 This is an auxiliary function run when an instance is start (on the primary
1184 node) or when an instance is migrated (on the target node).
1187 @param instance_name: the name of the target instance
1188 @param device_path: path of the physical block device, on the node
1189 @param idx: the disk index
1190 @return: absolute path to the disk's symlink
1193 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1195 os.symlink(device_path, link_name)
1196 except OSError, err:
1197 if err.errno == errno.EEXIST:
1198 if (not os.path.islink(link_name) or
1199 os.readlink(link_name) != device_path):
1200 os.remove(link_name)
1201 os.symlink(device_path, link_name)
1208 def _RemoveBlockDevLinks(instance_name, disks):
1209 """Remove the block device symlinks belonging to the given instance.
1212 for idx, _ in enumerate(disks):
1213 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1214 if os.path.islink(link_name):
1216 os.remove(link_name)
1218 logging.exception("Can't remove symlink '%s'", link_name)
1221 def _GatherAndLinkBlockDevs(instance):
1222 """Set up an instance's block device(s).
1224 This is run on the primary node at instance startup. The block
1225 devices must be already assembled.
1227 @type instance: L{objects.Instance}
1228 @param instance: the instance whose disks we shoul assemble
1230 @return: list of (disk_object, device_path)
1234 for idx, disk in enumerate(instance.disks):
1235 device = _RecursiveFindBD(disk)
1237 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1241 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1243 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1246 block_devices.append((disk, link_name))
1248 return block_devices
1251 def StartInstance(instance, startup_paused):
1252 """Start an instance.
1254 @type instance: L{objects.Instance}
1255 @param instance: the instance object
1256 @type startup_paused: bool
1257 @param instance: pause instance at startup?
1261 running_instances = GetInstanceList([instance.hypervisor])
1263 if instance.name in running_instances:
1264 logging.info("Instance %s already running, not starting", instance.name)
1268 block_devices = _GatherAndLinkBlockDevs(instance)
1269 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1270 hyper.StartInstance(instance, block_devices, startup_paused)
1271 except errors.BlockDeviceError, err:
1272 _Fail("Block device error: %s", err, exc=True)
1273 except errors.HypervisorError, err:
1274 _RemoveBlockDevLinks(instance.name, instance.disks)
1275 _Fail("Hypervisor error: %s", err, exc=True)
1278 def InstanceShutdown(instance, timeout):
1279 """Shut an instance down.
1281 @note: this functions uses polling with a hardcoded timeout.
1283 @type instance: L{objects.Instance}
1284 @param instance: the instance object
1285 @type timeout: integer
1286 @param timeout: maximum timeout for soft shutdown
1290 hv_name = instance.hypervisor
1291 hyper = hypervisor.GetHypervisor(hv_name)
1292 iname = instance.name
1294 if instance.name not in hyper.ListInstances():
1295 logging.info("Instance %s not running, doing nothing", iname)
1300 self.tried_once = False
1303 if iname not in hyper.ListInstances():
1307 hyper.StopInstance(instance, retry=self.tried_once)
1308 except errors.HypervisorError, err:
1309 if iname not in hyper.ListInstances():
1310 # if the instance is no longer existing, consider this a
1311 # success and go to cleanup
1314 _Fail("Failed to stop instance %s: %s", iname, err)
1316 self.tried_once = True
1318 raise utils.RetryAgain()
1321 utils.Retry(_TryShutdown(), 5, timeout)
1322 except utils.RetryTimeout:
1323 # the shutdown did not succeed
1324 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1327 hyper.StopInstance(instance, force=True)
1328 except errors.HypervisorError, err:
1329 if iname in hyper.ListInstances():
1330 # only raise an error if the instance still exists, otherwise
1331 # the error could simply be "instance ... unknown"!
1332 _Fail("Failed to force stop instance %s: %s", iname, err)
1336 if iname in hyper.ListInstances():
1337 _Fail("Could not shutdown instance %s even by destroy", iname)
1340 hyper.CleanupInstance(instance.name)
1341 except errors.HypervisorError, err:
1342 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1344 _RemoveBlockDevLinks(iname, instance.disks)
1347 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1348 """Reboot an instance.
1350 @type instance: L{objects.Instance}
1351 @param instance: the instance object to reboot
1352 @type reboot_type: str
1353 @param reboot_type: the type of reboot, one the following
1355 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1356 instance OS, do not recreate the VM
1357 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1358 restart the VM (at the hypervisor level)
1359 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1360 not accepted here, since that mode is handled differently, in
1361 cmdlib, and translates into full stop and start of the
1362 instance (instead of a call_instance_reboot RPC)
1363 @type shutdown_timeout: integer
1364 @param shutdown_timeout: maximum timeout for soft shutdown
1368 running_instances = GetInstanceList([instance.hypervisor])
1370 if instance.name not in running_instances:
1371 _Fail("Cannot reboot instance %s that is not running", instance.name)
1373 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1374 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1376 hyper.RebootInstance(instance)
1377 except errors.HypervisorError, err:
1378 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1379 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1381 InstanceShutdown(instance, shutdown_timeout)
1382 return StartInstance(instance, False)
1383 except errors.HypervisorError, err:
1384 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1386 _Fail("Invalid reboot_type received: %s", reboot_type)
1389 def InstanceBalloonMemory(instance, memory):
1390 """Resize an instance's memory.
1392 @type instance: L{objects.Instance}
1393 @param instance: the instance object
1395 @param memory: new memory amount in MB
1399 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1400 running = hyper.ListInstances()
1401 if instance.name not in running:
1402 logging.info("Instance %s is not running, cannot balloon", instance.name)
1405 hyper.BalloonInstanceMemory(instance, memory)
1406 except errors.HypervisorError, err:
1407 _Fail("Failed to balloon instance memory: %s", err, exc=True)
1410 def MigrationInfo(instance):
1411 """Gather information about an instance to be migrated.
1413 @type instance: L{objects.Instance}
1414 @param instance: the instance definition
1417 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1419 info = hyper.MigrationInfo(instance)
1420 except errors.HypervisorError, err:
1421 _Fail("Failed to fetch migration information: %s", err, exc=True)
1425 def AcceptInstance(instance, info, target):
1426 """Prepare the node to accept an instance.
1428 @type instance: L{objects.Instance}
1429 @param instance: the instance definition
1430 @type info: string/data (opaque)
1431 @param info: migration information, from the source node
1432 @type target: string
1433 @param target: target host (usually ip), on this node
1436 # TODO: why is this required only for DTS_EXT_MIRROR?
1437 if instance.disk_template in constants.DTS_EXT_MIRROR:
1438 # Create the symlinks, as the disks are not active
1441 _GatherAndLinkBlockDevs(instance)
1442 except errors.BlockDeviceError, err:
1443 _Fail("Block device error: %s", err, exc=True)
1445 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1447 hyper.AcceptInstance(instance, info, target)
1448 except errors.HypervisorError, err:
1449 if instance.disk_template in constants.DTS_EXT_MIRROR:
1450 _RemoveBlockDevLinks(instance.name, instance.disks)
1451 _Fail("Failed to accept instance: %s", err, exc=True)
1454 def FinalizeMigrationDst(instance, info, success):
1455 """Finalize any preparation to accept an instance.
1457 @type instance: L{objects.Instance}
1458 @param instance: the instance definition
1459 @type info: string/data (opaque)
1460 @param info: migration information, from the source node
1461 @type success: boolean
1462 @param success: whether the migration was a success or a failure
1465 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1467 hyper.FinalizeMigrationDst(instance, info, success)
1468 except errors.HypervisorError, err:
1469 _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1472 def MigrateInstance(instance, target, live):
1473 """Migrates an instance to another node.
1475 @type instance: L{objects.Instance}
1476 @param instance: the instance definition
1477 @type target: string
1478 @param target: the target node name
1480 @param live: whether the migration should be done live or not (the
1481 interpretation of this parameter is left to the hypervisor)
1482 @raise RPCFail: if migration fails for some reason
1485 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1488 hyper.MigrateInstance(instance, target, live)
1489 except errors.HypervisorError, err:
1490 _Fail("Failed to migrate instance: %s", err, exc=True)
1493 def FinalizeMigrationSource(instance, success, live):
1494 """Finalize the instance migration on the source node.
1496 @type instance: L{objects.Instance}
1497 @param instance: the instance definition of the migrated instance
1499 @param success: whether the migration succeeded or not
1501 @param live: whether the user requested a live migration or not
1502 @raise RPCFail: If the execution fails for some reason
1505 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1508 hyper.FinalizeMigrationSource(instance, success, live)
1509 except Exception, err: # pylint: disable=W0703
1510 _Fail("Failed to finalize the migration on the source node: %s", err,
1514 def GetMigrationStatus(instance):
1515 """Get the migration status
1517 @type instance: L{objects.Instance}
1518 @param instance: the instance that is being migrated
1519 @rtype: L{objects.MigrationStatus}
1520 @return: the status of the current migration (one of
1521 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1522 progress info that can be retrieved from the hypervisor
1523 @raise RPCFail: If the migration status cannot be retrieved
1526 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1528 return hyper.GetMigrationStatus(instance)
1529 except Exception, err: # pylint: disable=W0703
1530 _Fail("Failed to get migration status: %s", err, exc=True)
1533 def BlockdevCreate(disk, size, owner, on_primary, info):
1534 """Creates a block device for an instance.
1536 @type disk: L{objects.Disk}
1537 @param disk: the object describing the disk we should create
1539 @param size: the size of the physical underlying device, in MiB
1541 @param owner: the name of the instance for which disk is created,
1542 used for device cache data
1543 @type on_primary: boolean
1544 @param on_primary: indicates if it is the primary node or not
1546 @param info: string that will be sent to the physical device
1547 creation, used for example to set (LVM) tags on LVs
1549 @return: the new unique_id of the device (this can sometime be
1550 computed only after creation), or None. On secondary nodes,
1551 it's not required to return anything.
1554 # TODO: remove the obsolete "size" argument
1555 # pylint: disable=W0613
1558 for child in disk.children:
1560 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1561 except errors.BlockDeviceError, err:
1562 _Fail("Can't assemble device %s: %s", child, err)
1563 if on_primary or disk.AssembleOnSecondary():
1564 # we need the children open in case the device itself has to
1567 # pylint: disable=E1103
1569 except errors.BlockDeviceError, err:
1570 _Fail("Can't make child '%s' read-write: %s", child, err)
1574 device = bdev.Create(disk, clist)
1575 except errors.BlockDeviceError, err:
1576 _Fail("Can't create block device: %s", err)
1578 if on_primary or disk.AssembleOnSecondary():
1581 except errors.BlockDeviceError, err:
1582 _Fail("Can't assemble device after creation, unusual event: %s", err)
1583 if on_primary or disk.OpenOnSecondary():
1585 device.Open(force=True)
1586 except errors.BlockDeviceError, err:
1587 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1588 DevCacheManager.UpdateCache(device.dev_path, owner,
1589 on_primary, disk.iv_name)
1591 device.SetInfo(info)
1593 return device.unique_id
1596 def _WipeDevice(path, offset, size):
1597 """This function actually wipes the device.
1599 @param path: The path to the device to wipe
1600 @param offset: The offset in MiB in the file
1601 @param size: The size in MiB to write
1604 # Internal sizes are always in Mebibytes; if the following "dd" command
1605 # should use a different block size the offset and size given to this
1606 # function must be adjusted accordingly before being passed to "dd".
1607 block_size = 1024 * 1024
1609 cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1610 "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1612 result = utils.RunCmd(cmd)
1615 _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1616 result.fail_reason, result.output)
1619 def BlockdevWipe(disk, offset, size):
1620 """Wipes a block device.
1622 @type disk: L{objects.Disk}
1623 @param disk: the disk object we want to wipe
1625 @param offset: The offset in MiB in the file
1627 @param size: The size in MiB to write
1631 rdev = _RecursiveFindBD(disk)
1632 except errors.BlockDeviceError:
1636 _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1638 # Do cross verify some of the parameters
1640 _Fail("Negative offset")
1642 _Fail("Negative size")
1643 if offset > rdev.size:
1644 _Fail("Offset is bigger than device size")
1645 if (offset + size) > rdev.size:
1646 _Fail("The provided offset and size to wipe is bigger than device size")
1648 _WipeDevice(rdev.dev_path, offset, size)
1651 def BlockdevPauseResumeSync(disks, pause):
1652 """Pause or resume the sync of the block device.
1654 @type disks: list of L{objects.Disk}
1655 @param disks: the disks object we want to pause/resume
1657 @param pause: Wheater to pause or resume
1663 rdev = _RecursiveFindBD(disk)
1664 except errors.BlockDeviceError:
1668 success.append((False, ("Cannot change sync for device %s:"
1669 " device not found" % disk.iv_name)))
1672 result = rdev.PauseResumeSync(pause)
1675 success.append((result, None))
1681 success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1686 def BlockdevRemove(disk):
1687 """Remove a block device.
1689 @note: This is intended to be called recursively.
1691 @type disk: L{objects.Disk}
1692 @param disk: the disk object we should remove
1694 @return: the success of the operation
1699 rdev = _RecursiveFindBD(disk)
1700 except errors.BlockDeviceError, err:
1701 # probably can't attach
1702 logging.info("Can't attach to device %s in remove", disk)
1704 if rdev is not None:
1705 r_path = rdev.dev_path
1708 except errors.BlockDeviceError, err:
1709 msgs.append(str(err))
1711 DevCacheManager.RemoveCache(r_path)
1714 for child in disk.children:
1716 BlockdevRemove(child)
1717 except RPCFail, err:
1718 msgs.append(str(err))
1721 _Fail("; ".join(msgs))
1724 def _RecursiveAssembleBD(disk, owner, as_primary):
1725 """Activate a block device for an instance.
1727 This is run on the primary and secondary nodes for an instance.
1729 @note: this function is called recursively.
1731 @type disk: L{objects.Disk}
1732 @param disk: the disk we try to assemble
1734 @param owner: the name of the instance which owns the disk
1735 @type as_primary: boolean
1736 @param as_primary: if we should make the block device
1739 @return: the assembled device or None (in case no device
1741 @raise errors.BlockDeviceError: in case there is an error
1742 during the activation of the children or the device
1748 mcn = disk.ChildrenNeeded()
1750 mcn = 0 # max number of Nones allowed
1752 mcn = len(disk.children) - mcn # max number of Nones
1753 for chld_disk in disk.children:
1755 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1756 except errors.BlockDeviceError, err:
1757 if children.count(None) >= mcn:
1760 logging.error("Error in child activation (but continuing): %s",
1762 children.append(cdev)
1764 if as_primary or disk.AssembleOnSecondary():
1765 r_dev = bdev.Assemble(disk, children)
1767 if as_primary or disk.OpenOnSecondary():
1769 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1770 as_primary, disk.iv_name)
1777 def BlockdevAssemble(disk, owner, as_primary, idx):
1778 """Activate a block device for an instance.
1780 This is a wrapper over _RecursiveAssembleBD.
1782 @rtype: str or boolean
1783 @return: a C{/dev/...} path for primary nodes, and
1784 C{True} for secondary nodes
1788 result = _RecursiveAssembleBD(disk, owner, as_primary)
1789 if isinstance(result, bdev.BlockDev):
1790 # pylint: disable=E1103
1791 result = result.dev_path
1793 _SymlinkBlockDev(owner, result, idx)
1794 except errors.BlockDeviceError, err:
1795 _Fail("Error while assembling disk: %s", err, exc=True)
1796 except OSError, err:
1797 _Fail("Error while symlinking disk: %s", err, exc=True)
1802 def BlockdevShutdown(disk):
1803 """Shut down a block device.
1805 First, if the device is assembled (Attach() is successful), then
1806 the device is shutdown. Then the children of the device are
1809 This function is called recursively. Note that we don't cache the
1810 children or such, as oppossed to assemble, shutdown of different
1811 devices doesn't require that the upper device was active.
1813 @type disk: L{objects.Disk}
1814 @param disk: the description of the disk we should
1820 r_dev = _RecursiveFindBD(disk)
1821 if r_dev is not None:
1822 r_path = r_dev.dev_path
1825 DevCacheManager.RemoveCache(r_path)
1826 except errors.BlockDeviceError, err:
1827 msgs.append(str(err))
1830 for child in disk.children:
1832 BlockdevShutdown(child)
1833 except RPCFail, err:
1834 msgs.append(str(err))
1837 _Fail("; ".join(msgs))
1840 def BlockdevAddchildren(parent_cdev, new_cdevs):
1841 """Extend a mirrored block device.
1843 @type parent_cdev: L{objects.Disk}
1844 @param parent_cdev: the disk to which we should add children
1845 @type new_cdevs: list of L{objects.Disk}
1846 @param new_cdevs: the list of children which we should add
1850 parent_bdev = _RecursiveFindBD(parent_cdev)
1851 if parent_bdev is None:
1852 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1853 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1854 if new_bdevs.count(None) > 0:
1855 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1856 parent_bdev.AddChildren(new_bdevs)
1859 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1860 """Shrink a mirrored block device.
1862 @type parent_cdev: L{objects.Disk}
1863 @param parent_cdev: the disk from which we should remove children
1864 @type new_cdevs: list of L{objects.Disk}
1865 @param new_cdevs: the list of children which we should remove
1869 parent_bdev = _RecursiveFindBD(parent_cdev)
1870 if parent_bdev is None:
1871 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1873 for disk in new_cdevs:
1874 rpath = disk.StaticDevPath()
1876 bd = _RecursiveFindBD(disk)
1878 _Fail("Can't find device %s while removing children", disk)
1880 devs.append(bd.dev_path)
1882 if not utils.IsNormAbsPath(rpath):
1883 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1885 parent_bdev.RemoveChildren(devs)
1888 def BlockdevGetmirrorstatus(disks):
1889 """Get the mirroring status of a list of devices.
1891 @type disks: list of L{objects.Disk}
1892 @param disks: the list of disks which we should query
1894 @return: List of L{objects.BlockDevStatus}, one for each disk
1895 @raise errors.BlockDeviceError: if any of the disks cannot be
1901 rbd = _RecursiveFindBD(dsk)
1903 _Fail("Can't find device %s", dsk)
1905 stats.append(rbd.CombinedSyncStatus())
1910 def BlockdevGetmirrorstatusMulti(disks):
1911 """Get the mirroring status of a list of devices.
1913 @type disks: list of L{objects.Disk}
1914 @param disks: the list of disks which we should query
1916 @return: List of tuples, (bool, status), one for each disk; bool denotes
1917 success/failure, status is L{objects.BlockDevStatus} on success, string
1924 rbd = _RecursiveFindBD(disk)
1926 result.append((False, "Can't find device %s" % disk))
1929 status = rbd.CombinedSyncStatus()
1930 except errors.BlockDeviceError, err:
1931 logging.exception("Error while getting disk status")
1932 result.append((False, str(err)))
1934 result.append((True, status))
1936 assert len(disks) == len(result)
1941 def _RecursiveFindBD(disk):
1942 """Check if a device is activated.
1944 If so, return information about the real device.
1946 @type disk: L{objects.Disk}
1947 @param disk: the disk object we need to find
1949 @return: None if the device can't be found,
1950 otherwise the device instance
1955 for chdisk in disk.children:
1956 children.append(_RecursiveFindBD(chdisk))
1958 return bdev.FindDevice(disk, children)
1961 def _OpenRealBD(disk):
1962 """Opens the underlying block device of a disk.
1964 @type disk: L{objects.Disk}
1965 @param disk: the disk object we want to open
1968 real_disk = _RecursiveFindBD(disk)
1969 if real_disk is None:
1970 _Fail("Block device '%s' is not set up", disk)
1977 def BlockdevFind(disk):
1978 """Check if a device is activated.
1980 If it is, return information about the real device.
1982 @type disk: L{objects.Disk}
1983 @param disk: the disk to find
1984 @rtype: None or objects.BlockDevStatus
1985 @return: None if the disk cannot be found, otherwise a the current
1990 rbd = _RecursiveFindBD(disk)
1991 except errors.BlockDeviceError, err:
1992 _Fail("Failed to find device: %s", err, exc=True)
1997 return rbd.GetSyncStatus()
2000 def BlockdevGetsize(disks):
2001 """Computes the size of the given disks.
2003 If a disk is not found, returns None instead.
2005 @type disks: list of L{objects.Disk}
2006 @param disks: the list of disk to compute the size for
2008 @return: list with elements None if the disk cannot be found,
2015 rbd = _RecursiveFindBD(cf)
2016 except errors.BlockDeviceError:
2022 result.append(rbd.GetActualSize())
2026 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2027 """Export a block device to a remote node.
2029 @type disk: L{objects.Disk}
2030 @param disk: the description of the disk to export
2031 @type dest_node: str
2032 @param dest_node: the destination node to export to
2033 @type dest_path: str
2034 @param dest_path: the destination path on the target node
2035 @type cluster_name: str
2036 @param cluster_name: the cluster name, needed for SSH hostalias
2040 real_disk = _OpenRealBD(disk)
2042 # the block size on the read dd is 1MiB to match our units
2043 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2044 "dd if=%s bs=1048576 count=%s",
2045 real_disk.dev_path, str(disk.size))
2047 # we set here a smaller block size as, due to ssh buffering, more
2048 # than 64-128k will mostly ignored; we use nocreat to fail if the
2049 # device is not already there or we pass a wrong path; we use
2050 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2051 # to not buffer too much memory; this means that at best, we flush
2052 # every 64k, which will not be very fast
2053 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2054 " oflag=dsync", dest_path)
2056 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2057 constants.SSH_LOGIN_USER,
2060 # all commands have been checked, so we're safe to combine them
2061 command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2063 result = utils.RunCmd(["bash", "-c", command])
2066 _Fail("Disk copy command '%s' returned error: %s"
2067 " output: %s", command, result.fail_reason, result.output)
2070 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2071 """Write a file to the filesystem.
2073 This allows the master to overwrite(!) a file. It will only perform
2074 the operation if the file belongs to a list of configuration files.
2076 @type file_name: str
2077 @param file_name: the target file name
2079 @param data: the new contents of the file
2081 @param mode: the mode to give the file (can be None)
2083 @param uid: the owner of the file
2085 @param gid: the group of the file
2087 @param atime: the atime to set on the file (can be None)
2089 @param mtime: the mtime to set on the file (can be None)
2093 file_name = vcluster.LocalizeVirtualPath(file_name)
2095 if not os.path.isabs(file_name):
2096 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2098 if file_name not in _ALLOWED_UPLOAD_FILES:
2099 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2102 raw_data = _Decompress(data)
2104 if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2105 _Fail("Invalid username/groupname type")
2107 getents = runtime.GetEnts()
2108 uid = getents.LookupUser(uid)
2109 gid = getents.LookupGroup(gid)
2111 utils.SafeWriteFile(file_name, None,
2112 data=raw_data, mode=mode, uid=uid, gid=gid,
2113 atime=atime, mtime=mtime)
2116 def RunOob(oob_program, command, node, timeout):
2117 """Executes oob_program with given command on given node.
2119 @param oob_program: The path to the executable oob_program
2120 @param command: The command to invoke on oob_program
2121 @param node: The node given as an argument to the program
2122 @param timeout: Timeout after which we kill the oob program
2125 @raise RPCFail: If execution fails for some reason
2128 result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2131 _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2132 result.fail_reason, result.output)
2134 return result.stdout
2137 def _OSOndiskAPIVersion(os_dir):
2138 """Compute and return the API version of a given OS.
2140 This function will try to read the API version of the OS residing in
2141 the 'os_dir' directory.
2144 @param os_dir: the directory in which we should look for the OS
2146 @return: tuple (status, data) with status denoting the validity and
2147 data holding either the vaid versions or an error message
2150 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2153 st = os.stat(api_file)
2154 except EnvironmentError, err:
2155 return False, ("Required file '%s' not found under path %s: %s" %
2156 (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2158 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2159 return False, ("File '%s' in %s is not a regular file" %
2160 (constants.OS_API_FILE, os_dir))
2163 api_versions = utils.ReadFile(api_file).splitlines()
2164 except EnvironmentError, err:
2165 return False, ("Error while reading the API version file at %s: %s" %
2166 (api_file, utils.ErrnoOrStr(err)))
2169 api_versions = [int(version.strip()) for version in api_versions]
2170 except (TypeError, ValueError), err:
2171 return False, ("API version(s) can't be converted to integer: %s" %
2174 return True, api_versions
2177 def DiagnoseOS(top_dirs=None):
2178 """Compute the validity for all OSes.
2180 @type top_dirs: list
2181 @param top_dirs: the list of directories in which to
2182 search (if not given defaults to
2183 L{pathutils.OS_SEARCH_PATH})
2184 @rtype: list of L{objects.OS}
2185 @return: a list of tuples (name, path, status, diagnose, variants,
2186 parameters, api_version) for all (potential) OSes under all
2187 search paths, where:
2188 - name is the (potential) OS name
2189 - path is the full path to the OS
2190 - status True/False is the validity of the OS
2191 - diagnose is the error message for an invalid OS, otherwise empty
2192 - variants is a list of supported OS variants, if any
2193 - parameters is a list of (name, help) parameters, if any
2194 - api_version is a list of support OS API versions
2197 if top_dirs is None:
2198 top_dirs = pathutils.OS_SEARCH_PATH
2201 for dir_name in top_dirs:
2202 if os.path.isdir(dir_name):
2204 f_names = utils.ListVisibleFiles(dir_name)
2205 except EnvironmentError, err:
2206 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2208 for name in f_names:
2209 os_path = utils.PathJoin(dir_name, name)
2210 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2213 variants = os_inst.supported_variants
2214 parameters = os_inst.supported_parameters
2215 api_versions = os_inst.api_versions
2218 variants = parameters = api_versions = []
2219 result.append((name, os_path, status, diagnose, variants,
2220 parameters, api_versions))
2225 def _TryOSFromDisk(name, base_dir=None):
2226 """Create an OS instance from disk.
2228 This function will return an OS instance if the given name is a
2231 @type base_dir: string
2232 @keyword base_dir: Base directory containing OS installations.
2233 Defaults to a search in all the OS_SEARCH_PATH dirs.
2235 @return: success and either the OS instance if we find a valid one,
2239 if base_dir is None:
2240 os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2242 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2245 return False, "Directory for OS %s not found in search path" % name
2247 status, api_versions = _OSOndiskAPIVersion(os_dir)
2250 return status, api_versions
2252 if not constants.OS_API_VERSIONS.intersection(api_versions):
2253 return False, ("API version mismatch for path '%s': found %s, want %s." %
2254 (os_dir, api_versions, constants.OS_API_VERSIONS))
2256 # OS Files dictionary, we will populate it with the absolute path
2257 # names; if the value is True, then it is a required file, otherwise
2259 os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2261 if max(api_versions) >= constants.OS_API_V15:
2262 os_files[constants.OS_VARIANTS_FILE] = False
2264 if max(api_versions) >= constants.OS_API_V20:
2265 os_files[constants.OS_PARAMETERS_FILE] = True
2267 del os_files[constants.OS_SCRIPT_VERIFY]
2269 for (filename, required) in os_files.items():
2270 os_files[filename] = utils.PathJoin(os_dir, filename)
2273 st = os.stat(os_files[filename])
2274 except EnvironmentError, err:
2275 if err.errno == errno.ENOENT and not required:
2276 del os_files[filename]
2278 return False, ("File '%s' under path '%s' is missing (%s)" %
2279 (filename, os_dir, utils.ErrnoOrStr(err)))
2281 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2282 return False, ("File '%s' under path '%s' is not a regular file" %
2285 if filename in constants.OS_SCRIPTS:
2286 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2287 return False, ("File '%s' under path '%s' is not executable" %
2291 if constants.OS_VARIANTS_FILE in os_files:
2292 variants_file = os_files[constants.OS_VARIANTS_FILE]
2295 utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2296 except EnvironmentError, err:
2297 # we accept missing files, but not other errors
2298 if err.errno != errno.ENOENT:
2299 return False, ("Error while reading the OS variants file at %s: %s" %
2300 (variants_file, utils.ErrnoOrStr(err)))
2303 if constants.OS_PARAMETERS_FILE in os_files:
2304 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2306 parameters = utils.ReadFile(parameters_file).splitlines()
2307 except EnvironmentError, err:
2308 return False, ("Error while reading the OS parameters file at %s: %s" %
2309 (parameters_file, utils.ErrnoOrStr(err)))
2310 parameters = [v.split(None, 1) for v in parameters]
2312 os_obj = objects.OS(name=name, path=os_dir,
2313 create_script=os_files[constants.OS_SCRIPT_CREATE],
2314 export_script=os_files[constants.OS_SCRIPT_EXPORT],
2315 import_script=os_files[constants.OS_SCRIPT_IMPORT],
2316 rename_script=os_files[constants.OS_SCRIPT_RENAME],
2317 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2319 supported_variants=variants,
2320 supported_parameters=parameters,
2321 api_versions=api_versions)
2325 def OSFromDisk(name, base_dir=None):
2326 """Create an OS instance from disk.
2328 This function will return an OS instance if the given name is a
2329 valid OS name. Otherwise, it will raise an appropriate
2330 L{RPCFail} exception, detailing why this is not a valid OS.
2332 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2333 an exception but returns true/false status data.
2335 @type base_dir: string
2336 @keyword base_dir: Base directory containing OS installations.
2337 Defaults to a search in all the OS_SEARCH_PATH dirs.
2338 @rtype: L{objects.OS}
2339 @return: the OS instance if we find a valid one
2340 @raise RPCFail: if we don't find a valid OS
2343 name_only = objects.OS.GetName(name)
2344 status, payload = _TryOSFromDisk(name_only, base_dir)
2352 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2353 """Calculate the basic environment for an os script.
2356 @param os_name: full operating system name (including variant)
2357 @type inst_os: L{objects.OS}
2358 @param inst_os: operating system for which the environment is being built
2359 @type os_params: dict
2360 @param os_params: the OS parameters
2361 @type debug: integer
2362 @param debug: debug level (0 or 1, for OS Api 10)
2364 @return: dict of environment variables
2365 @raise errors.BlockDeviceError: if the block device
2371 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2372 result["OS_API_VERSION"] = "%d" % api_version
2373 result["OS_NAME"] = inst_os.name
2374 result["DEBUG_LEVEL"] = "%d" % debug
2377 if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2378 variant = objects.OS.GetVariant(os_name)
2380 variant = inst_os.supported_variants[0]
2383 result["OS_VARIANT"] = variant
2386 for pname, pvalue in os_params.items():
2387 result["OSP_%s" % pname.upper()] = pvalue
2389 # Set a default path otherwise programs called by OS scripts (or
2390 # even hooks called from OS scripts) might break, and we don't want
2391 # to have each script require setting a PATH variable
2392 result["PATH"] = constants.HOOKS_PATH
2397 def OSEnvironment(instance, inst_os, debug=0):
2398 """Calculate the environment for an os script.
2400 @type instance: L{objects.Instance}
2401 @param instance: target instance for the os script run
2402 @type inst_os: L{objects.OS}
2403 @param inst_os: operating system for which the environment is being built
2404 @type debug: integer
2405 @param debug: debug level (0 or 1, for OS Api 10)
2407 @return: dict of environment variables
2408 @raise errors.BlockDeviceError: if the block device
2412 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2414 for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2415 result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2417 result["HYPERVISOR"] = instance.hypervisor
2418 result["DISK_COUNT"] = "%d" % len(instance.disks)
2419 result["NIC_COUNT"] = "%d" % len(instance.nics)
2420 result["INSTANCE_SECONDARY_NODES"] = \
2421 ("%s" % " ".join(instance.secondary_nodes))
2424 for idx, disk in enumerate(instance.disks):
2425 real_disk = _OpenRealBD(disk)
2426 result["DISK_%d_PATH" % idx] = real_disk.dev_path
2427 result["DISK_%d_ACCESS" % idx] = disk.mode
2428 if constants.HV_DISK_TYPE in instance.hvparams:
2429 result["DISK_%d_FRONTEND_TYPE" % idx] = \
2430 instance.hvparams[constants.HV_DISK_TYPE]
2431 if disk.dev_type in constants.LDS_BLOCK:
2432 result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2433 elif disk.dev_type == constants.LD_FILE:
2434 result["DISK_%d_BACKEND_TYPE" % idx] = \
2435 "file:%s" % disk.physical_id[0]
2438 for idx, nic in enumerate(instance.nics):
2439 result["NIC_%d_MAC" % idx] = nic.mac
2441 result["NIC_%d_IP" % idx] = nic.ip
2442 result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2443 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2444 result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2445 if nic.nicparams[constants.NIC_LINK]:
2446 result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2447 if constants.HV_NIC_TYPE in instance.hvparams:
2448 result["NIC_%d_FRONTEND_TYPE" % idx] = \
2449 instance.hvparams[constants.HV_NIC_TYPE]
2452 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2453 for key, value in source.items():
2454 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2459 def BlockdevGrow(disk, amount, dryrun, backingstore):
2460 """Grow a stack of block devices.
2462 This function is called recursively, with the childrens being the
2463 first ones to resize.
2465 @type disk: L{objects.Disk}
2466 @param disk: the disk to be grown
2467 @type amount: integer
2468 @param amount: the amount (in mebibytes) to grow with
2469 @type dryrun: boolean
2470 @param dryrun: whether to execute the operation in simulation mode
2471 only, without actually increasing the size
2472 @param backingstore: whether to execute the operation on backing storage
2473 only, or on "logical" storage only; e.g. DRBD is logical storage,
2474 whereas LVM, file, RBD are backing storage
2475 @rtype: (status, result)
2476 @return: a tuple with the status of the operation (True/False), and
2477 the errors message if status is False
2480 r_dev = _RecursiveFindBD(disk)
2482 _Fail("Cannot find block device %s", disk)
2485 r_dev.Grow(amount, dryrun, backingstore)
2486 except errors.BlockDeviceError, err:
2487 _Fail("Failed to grow block device: %s", err, exc=True)
2490 def BlockdevSnapshot(disk):
2491 """Create a snapshot copy of a block device.
2493 This function is called recursively, and the snapshot is actually created
2494 just for the leaf lvm backend device.
2496 @type disk: L{objects.Disk}
2497 @param disk: the disk to be snapshotted
2499 @return: snapshot disk ID as (vg, lv)
2502 if disk.dev_type == constants.LD_DRBD8:
2503 if not disk.children:
2504 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2506 return BlockdevSnapshot(disk.children[0])
2507 elif disk.dev_type == constants.LD_LV:
2508 r_dev = _RecursiveFindBD(disk)
2509 if r_dev is not None:
2510 # FIXME: choose a saner value for the snapshot size
2511 # let's stay on the safe side and ask for the full size, for now
2512 return r_dev.Snapshot(disk.size)
2514 _Fail("Cannot find block device %s", disk)
2516 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2517 disk.unique_id, disk.dev_type)
2520 def FinalizeExport(instance, snap_disks):
2521 """Write out the export configuration information.
2523 @type instance: L{objects.Instance}
2524 @param instance: the instance which we export, used for
2525 saving configuration
2526 @type snap_disks: list of L{objects.Disk}
2527 @param snap_disks: list of snapshot block devices, which
2528 will be used to get the actual name of the dump file
2533 destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2534 finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2536 config = objects.SerializableConfigParser()
2538 config.add_section(constants.INISECT_EXP)
2539 config.set(constants.INISECT_EXP, "version", "0")
2540 config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2541 config.set(constants.INISECT_EXP, "source", instance.primary_node)
2542 config.set(constants.INISECT_EXP, "os", instance.os)
2543 config.set(constants.INISECT_EXP, "compression", "none")
2545 config.add_section(constants.INISECT_INS)
2546 config.set(constants.INISECT_INS, "name", instance.name)
2547 config.set(constants.INISECT_INS, "maxmem", "%d" %
2548 instance.beparams[constants.BE_MAXMEM])
2549 config.set(constants.INISECT_INS, "minmem", "%d" %
2550 instance.beparams[constants.BE_MINMEM])
2551 # "memory" is deprecated, but useful for exporting to old ganeti versions
2552 config.set(constants.INISECT_INS, "memory", "%d" %
2553 instance.beparams[constants.BE_MAXMEM])
2554 config.set(constants.INISECT_INS, "vcpus", "%d" %
2555 instance.beparams[constants.BE_VCPUS])
2556 config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2557 config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2558 config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2561 for nic_count, nic in enumerate(instance.nics):
2563 config.set(constants.INISECT_INS, "nic%d_mac" %
2564 nic_count, "%s" % nic.mac)
2565 config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2566 for param in constants.NICS_PARAMETER_TYPES:
2567 config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2568 "%s" % nic.nicparams.get(param, None))
2569 # TODO: redundant: on load can read nics until it doesn't exist
2570 config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2573 for disk_count, disk in enumerate(snap_disks):
2576 config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2577 ("%s" % disk.iv_name))
2578 config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2579 ("%s" % disk.physical_id[1]))
2580 config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2583 config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2585 # New-style hypervisor/backend parameters
2587 config.add_section(constants.INISECT_HYP)
2588 for name, value in instance.hvparams.items():
2589 if name not in constants.HVC_GLOBALS:
2590 config.set(constants.INISECT_HYP, name, str(value))
2592 config.add_section(constants.INISECT_BEP)
2593 for name, value in instance.beparams.items():
2594 config.set(constants.INISECT_BEP, name, str(value))
2596 config.add_section(constants.INISECT_OSP)
2597 for name, value in instance.osparams.items():
2598 config.set(constants.INISECT_OSP, name, str(value))
2600 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2601 data=config.Dumps())
2602 shutil.rmtree(finaldestdir, ignore_errors=True)
2603 shutil.move(destdir, finaldestdir)
2606 def ExportInfo(dest):
2607 """Get export configuration information.
2610 @param dest: directory containing the export
2612 @rtype: L{objects.SerializableConfigParser}
2613 @return: a serializable config file containing the
2617 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2619 config = objects.SerializableConfigParser()
2622 if (not config.has_section(constants.INISECT_EXP) or
2623 not config.has_section(constants.INISECT_INS)):
2624 _Fail("Export info file doesn't have the required fields")
2626 return config.Dumps()
2630 """Return a list of exports currently available on this machine.
2633 @return: list of the exports
2636 if os.path.isdir(pathutils.EXPORT_DIR):
2637 return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2639 _Fail("No exports directory")
2642 def RemoveExport(export):
2643 """Remove an existing export from the node.
2646 @param export: the name of the export to remove
2650 target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2653 shutil.rmtree(target)
2654 except EnvironmentError, err:
2655 _Fail("Error while removing the export: %s", err, exc=True)
2658 def BlockdevRename(devlist):
2659 """Rename a list of block devices.
2661 @type devlist: list of tuples
2662 @param devlist: list of tuples of the form (disk,
2663 new_logical_id, new_physical_id); disk is an
2664 L{objects.Disk} object describing the current disk,
2665 and new logical_id/physical_id is the name we
2668 @return: True if all renames succeeded, False otherwise
2673 for disk, unique_id in devlist:
2674 dev = _RecursiveFindBD(disk)
2676 msgs.append("Can't find device %s in rename" % str(disk))
2680 old_rpath = dev.dev_path
2681 dev.Rename(unique_id)
2682 new_rpath = dev.dev_path
2683 if old_rpath != new_rpath:
2684 DevCacheManager.RemoveCache(old_rpath)
2685 # FIXME: we should add the new cache information here, like:
2686 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2687 # but we don't have the owner here - maybe parse from existing
2688 # cache? for now, we only lose lvm data when we rename, which
2689 # is less critical than DRBD or MD
2690 except errors.BlockDeviceError, err:
2691 msgs.append("Can't rename device '%s' to '%s': %s" %
2692 (dev, unique_id, err))
2693 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2696 _Fail("; ".join(msgs))
2699 def _TransformFileStorageDir(fs_dir):
2700 """Checks whether given file_storage_dir is valid.
2702 Checks wheter the given fs_dir is within the cluster-wide default
2703 file_storage_dir or the shared_file_storage_dir, which are stored in
2704 SimpleStore. Only paths under those directories are allowed.
2707 @param fs_dir: the path to check
2709 @return: the normalized path if valid, None otherwise
2712 if not (constants.ENABLE_FILE_STORAGE or
2713 constants.ENABLE_SHARED_FILE_STORAGE):
2714 _Fail("File storage disabled at configure time")
2716 fs_dir = os.path.normpath(fs_dir)
2717 base_fstore = cfg.GetFileStorageDir()
2718 base_shared = cfg.GetSharedFileStorageDir()
2719 if not (utils.IsBelowDir(base_fstore, fs_dir) or
2720 utils.IsBelowDir(base_shared, fs_dir)):
2721 _Fail("File storage directory '%s' is not under base file"
2722 " storage directory '%s' or shared storage directory '%s'",
2723 fs_dir, base_fstore, base_shared)
2727 def CreateFileStorageDir(file_storage_dir):
2728 """Create file storage directory.
2730 @type file_storage_dir: str
2731 @param file_storage_dir: directory to create
2734 @return: tuple with first element a boolean indicating wheter dir
2735 creation was successful or not
2738 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2739 if os.path.exists(file_storage_dir):
2740 if not os.path.isdir(file_storage_dir):
2741 _Fail("Specified storage dir '%s' is not a directory",
2745 os.makedirs(file_storage_dir, 0750)
2746 except OSError, err:
2747 _Fail("Cannot create file storage directory '%s': %s",
2748 file_storage_dir, err, exc=True)
2751 def RemoveFileStorageDir(file_storage_dir):
2752 """Remove file storage directory.
2754 Remove it only if it's empty. If not log an error and return.
2756 @type file_storage_dir: str
2757 @param file_storage_dir: the directory we should cleanup
2758 @rtype: tuple (success,)
2759 @return: tuple of one element, C{success}, denoting
2760 whether the operation was successful
2763 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2764 if os.path.exists(file_storage_dir):
2765 if not os.path.isdir(file_storage_dir):
2766 _Fail("Specified Storage directory '%s' is not a directory",
2768 # deletes dir only if empty, otherwise we want to fail the rpc call
2770 os.rmdir(file_storage_dir)
2771 except OSError, err:
2772 _Fail("Cannot remove file storage directory '%s': %s",
2773 file_storage_dir, err)
2776 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2777 """Rename the file storage directory.
2779 @type old_file_storage_dir: str
2780 @param old_file_storage_dir: the current path
2781 @type new_file_storage_dir: str
2782 @param new_file_storage_dir: the name we should rename to
2783 @rtype: tuple (success,)
2784 @return: tuple of one element, C{success}, denoting
2785 whether the operation was successful
2788 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2789 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2790 if not os.path.exists(new_file_storage_dir):
2791 if os.path.isdir(old_file_storage_dir):
2793 os.rename(old_file_storage_dir, new_file_storage_dir)
2794 except OSError, err:
2795 _Fail("Cannot rename '%s' to '%s': %s",
2796 old_file_storage_dir, new_file_storage_dir, err)
2798 _Fail("Specified storage dir '%s' is not a directory",
2799 old_file_storage_dir)
2801 if os.path.exists(old_file_storage_dir):
2802 _Fail("Cannot rename '%s' to '%s': both locations exist",
2803 old_file_storage_dir, new_file_storage_dir)
2806 def _EnsureJobQueueFile(file_name):
2807 """Checks whether the given filename is in the queue directory.
2809 @type file_name: str
2810 @param file_name: the file name we should check
2812 @raises RPCFail: if the file is not valid
2815 if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2816 _Fail("Passed job queue file '%s' does not belong to"
2817 " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2820 def JobQueueUpdate(file_name, content):
2821 """Updates a file in the queue directory.
2823 This is just a wrapper over L{utils.io.WriteFile}, with proper
2826 @type file_name: str
2827 @param file_name: the job file name
2829 @param content: the new job contents
2831 @return: the success of the operation
2834 file_name = vcluster.LocalizeVirtualPath(file_name)
2836 _EnsureJobQueueFile(file_name)
2837 getents = runtime.GetEnts()
2839 # Write and replace the file atomically
2840 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2841 gid=getents.masterd_gid)
2844 def JobQueueRename(old, new):
2845 """Renames a job queue file.
2847 This is just a wrapper over os.rename with proper checking.
2850 @param old: the old (actual) file name
2852 @param new: the desired file name
2854 @return: the success of the operation and payload
2857 old = vcluster.LocalizeVirtualPath(old)
2858 new = vcluster.LocalizeVirtualPath(new)
2860 _EnsureJobQueueFile(old)
2861 _EnsureJobQueueFile(new)
2863 getents = runtime.GetEnts()
2865 utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2866 dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2869 def BlockdevClose(instance_name, disks):
2870 """Closes the given block devices.
2872 This means they will be switched to secondary mode (in case of
2875 @param instance_name: if the argument is not empty, the symlinks
2876 of this instance will be removed
2877 @type disks: list of L{objects.Disk}
2878 @param disks: the list of disks to be closed
2879 @rtype: tuple (success, message)
2880 @return: a tuple of success and message, where success
2881 indicates the succes of the operation, and message
2882 which will contain the error details in case we
2888 rd = _RecursiveFindBD(cf)
2890 _Fail("Can't find device %s", cf)
2897 except errors.BlockDeviceError, err:
2898 msg.append(str(err))
2900 _Fail("Can't make devices secondary: %s", ",".join(msg))
2903 _RemoveBlockDevLinks(instance_name, disks)
2906 def ValidateHVParams(hvname, hvparams):
2907 """Validates the given hypervisor parameters.
2909 @type hvname: string
2910 @param hvname: the hypervisor name
2911 @type hvparams: dict
2912 @param hvparams: the hypervisor parameters to be validated
2917 hv_type = hypervisor.GetHypervisor(hvname)
2918 hv_type.ValidateParameters(hvparams)
2919 except errors.HypervisorError, err:
2920 _Fail(str(err), log=False)
2923 def _CheckOSPList(os_obj, parameters):
2924 """Check whether a list of parameters is supported by the OS.
2926 @type os_obj: L{objects.OS}
2927 @param os_obj: OS object to check
2928 @type parameters: list
2929 @param parameters: the list of parameters to check
2932 supported = [v[0] for v in os_obj.supported_parameters]
2933 delta = frozenset(parameters).difference(supported)
2935 _Fail("The following parameters are not supported"
2936 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2939 def ValidateOS(required, osname, checks, osparams):
2940 """Validate the given OS' parameters.
2942 @type required: boolean
2943 @param required: whether absence of the OS should translate into
2945 @type osname: string
2946 @param osname: the OS to be validated
2948 @param checks: list of the checks to run (currently only 'parameters')
2949 @type osparams: dict
2950 @param osparams: dictionary with OS parameters
2952 @return: True if the validation passed, or False if the OS was not
2953 found and L{required} was false
2956 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2957 _Fail("Unknown checks required for OS %s: %s", osname,
2958 set(checks).difference(constants.OS_VALIDATE_CALLS))
2960 name_only = objects.OS.GetName(osname)
2961 status, tbv = _TryOSFromDisk(name_only, None)
2969 if max(tbv.api_versions) < constants.OS_API_V20:
2972 if constants.OS_VALIDATE_PARAMETERS in checks:
2973 _CheckOSPList(tbv, osparams.keys())
2975 validate_env = OSCoreEnv(osname, tbv, osparams)
2976 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2977 cwd=tbv.path, reset_env=True)
2979 logging.error("os validate command '%s' returned error: %s output: %s",
2980 result.cmd, result.fail_reason, result.output)
2981 _Fail("OS validation script failed (%s), output: %s",
2982 result.fail_reason, result.output, log=False)
2988 """Demotes the current node from master candidate role.
2991 # try to ensure we're not the master by mistake
2992 master, myself = ssconf.GetMasterAndMyself()
2993 if master == myself:
2994 _Fail("ssconf status shows I'm the master node, will not demote")
2996 result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
2997 if not result.failed:
2998 _Fail("The master daemon is running, will not demote")
3001 if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3002 utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3003 except EnvironmentError, err:
3004 if err.errno != errno.ENOENT:
3005 _Fail("Error while backing up cluster file: %s", err, exc=True)
3007 utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3010 def _GetX509Filenames(cryptodir, name):
3011 """Returns the full paths for the private key and certificate.
3014 return (utils.PathJoin(cryptodir, name),
3015 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3016 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3019 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3020 """Creates a new X509 certificate for SSL/TLS.
3023 @param validity: Validity in seconds
3024 @rtype: tuple; (string, string)
3025 @return: Certificate name and public part
3028 (key_pem, cert_pem) = \
3029 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3030 min(validity, _MAX_SSL_CERT_VALIDITY))
3032 cert_dir = tempfile.mkdtemp(dir=cryptodir,
3033 prefix="x509-%s-" % utils.TimestampForFilename())
3035 name = os.path.basename(cert_dir)
3036 assert len(name) > 5
3038 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3040 utils.WriteFile(key_file, mode=0400, data=key_pem)
3041 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3043 # Never return private key as it shouldn't leave the node
3044 return (name, cert_pem)
3046 shutil.rmtree(cert_dir, ignore_errors=True)
3050 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3051 """Removes a X509 certificate.
3054 @param name: Certificate name
3057 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3059 utils.RemoveFile(key_file)
3060 utils.RemoveFile(cert_file)
3064 except EnvironmentError, err:
3065 _Fail("Cannot remove certificate directory '%s': %s",
3069 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3070 """Returns the command for the requested input/output.
3072 @type instance: L{objects.Instance}
3073 @param instance: The instance object
3074 @param mode: Import/export mode
3075 @param ieio: Input/output type
3076 @param ieargs: Input/output arguments
3079 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3086 if ieio == constants.IEIO_FILE:
3087 (filename, ) = ieargs
3089 if not utils.IsNormAbsPath(filename):
3090 _Fail("Path '%s' is not normalized or absolute", filename)
3092 real_filename = os.path.realpath(filename)
3093 directory = os.path.dirname(real_filename)
3095 if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3096 _Fail("File '%s' is not under exports directory '%s': %s",
3097 filename, pathutils.EXPORT_DIR, real_filename)
3100 utils.Makedirs(directory, mode=0750)
3102 quoted_filename = utils.ShellQuote(filename)
3104 if mode == constants.IEM_IMPORT:
3105 suffix = "> %s" % quoted_filename
3106 elif mode == constants.IEM_EXPORT:
3107 suffix = "< %s" % quoted_filename
3109 # Retrieve file size
3111 st = os.stat(filename)
3112 except EnvironmentError, err:
3113 logging.error("Can't stat(2) %s: %s", filename, err)
3115 exp_size = utils.BytesToMebibyte(st.st_size)
3117 elif ieio == constants.IEIO_RAW_DISK:
3120 real_disk = _OpenRealBD(disk)
3122 if mode == constants.IEM_IMPORT:
3123 # we set here a smaller block size as, due to transport buffering, more
3124 # than 64-128k will mostly ignored; we use nocreat to fail if the device
3125 # is not already there or we pass a wrong path; we use notrunc to no
3126 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3127 # much memory; this means that at best, we flush every 64k, which will
3129 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3130 " bs=%s oflag=dsync"),
3134 elif mode == constants.IEM_EXPORT:
3135 # the block size on the read dd is 1MiB to match our units
3136 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3138 str(1024 * 1024), # 1 MB
3140 exp_size = disk.size
3142 elif ieio == constants.IEIO_SCRIPT:
3143 (disk, disk_index, ) = ieargs
3145 assert isinstance(disk_index, (int, long))
3147 real_disk = _OpenRealBD(disk)
3149 inst_os = OSFromDisk(instance.os)
3150 env = OSEnvironment(instance, inst_os)
3152 if mode == constants.IEM_IMPORT:
3153 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3154 env["IMPORT_INDEX"] = str(disk_index)
3155 script = inst_os.import_script
3157 elif mode == constants.IEM_EXPORT:
3158 env["EXPORT_DEVICE"] = real_disk.dev_path
3159 env["EXPORT_INDEX"] = str(disk_index)
3160 script = inst_os.export_script
3162 # TODO: Pass special environment only to script
3163 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3165 if mode == constants.IEM_IMPORT:
3166 suffix = "| %s" % script_cmd
3168 elif mode == constants.IEM_EXPORT:
3169 prefix = "%s |" % script_cmd
3171 # Let script predict size
3172 exp_size = constants.IE_CUSTOM_SIZE
3175 _Fail("Invalid %s I/O mode %r", mode, ieio)
3177 return (env, prefix, suffix, exp_size)
3180 def _CreateImportExportStatusDir(prefix):
3181 """Creates status directory for import/export.
3184 return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3186 (prefix, utils.TimestampForFilename())))
3189 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3191 """Starts an import or export daemon.
3193 @param mode: Import/output mode
3194 @type opts: L{objects.ImportExportOptions}
3195 @param opts: Daemon options
3197 @param host: Remote host for export (None for import)
3199 @param port: Remote port for export (None for import)
3200 @type instance: L{objects.Instance}
3201 @param instance: Instance object
3202 @type component: string
3203 @param component: which part of the instance is transferred now,
3205 @param ieio: Input/output type
3206 @param ieioargs: Input/output arguments
3209 if mode == constants.IEM_IMPORT:
3212 if not (host is None and port is None):
3213 _Fail("Can not specify host or port on import")
3215 elif mode == constants.IEM_EXPORT:
3218 if host is None or port is None:
3219 _Fail("Host and port must be specified for an export")
3222 _Fail("Invalid mode %r", mode)
3224 if (opts.key_name is None) ^ (opts.ca_pem is None):
3225 _Fail("Cluster certificate can only be used for both key and CA")
3227 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3228 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3230 if opts.key_name is None:
3232 key_path = pathutils.NODED_CERT_FILE
3233 cert_path = pathutils.NODED_CERT_FILE
3234 assert opts.ca_pem is None
3236 (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3238 assert opts.ca_pem is not None
3240 for i in [key_path, cert_path]:
3241 if not os.path.exists(i):
3242 _Fail("File '%s' does not exist" % i)
3244 status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3246 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3247 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3248 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3250 if opts.ca_pem is None:
3252 ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3257 utils.WriteFile(ca_file, data=ca, mode=0400)
3260 pathutils.IMPORT_EXPORT_DAEMON,
3262 "--key=%s" % key_path,
3263 "--cert=%s" % cert_path,
3264 "--ca=%s" % ca_file,
3268 cmd.append("--host=%s" % host)
3271 cmd.append("--port=%s" % port)
3274 cmd.append("--ipv6")
3276 cmd.append("--ipv4")
3279 cmd.append("--compress=%s" % opts.compress)
3282 cmd.append("--magic=%s" % opts.magic)
3284 if exp_size is not None:
3285 cmd.append("--expected-size=%s" % exp_size)
3288 cmd.append("--cmd-prefix=%s" % cmd_prefix)
3291 cmd.append("--cmd-suffix=%s" % cmd_suffix)
3293 if mode == constants.IEM_EXPORT:
3294 # Retry connection a few times when connecting to remote peer
3295 cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3296 cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3297 elif opts.connect_timeout is not None:
3298 assert mode == constants.IEM_IMPORT
3299 # Overall timeout for establishing connection while listening
3300 cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3302 logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3304 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3305 # support for receiving a file descriptor for output
3306 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3309 # The import/export name is simply the status directory name
3310 return os.path.basename(status_dir)
3313 shutil.rmtree(status_dir, ignore_errors=True)
3317 def GetImportExportStatus(names):
3318 """Returns import/export daemon status.
3320 @type names: sequence
3321 @param names: List of names
3322 @rtype: List of dicts
3323 @return: Returns a list of the state of each named import/export or None if a
3324 status couldn't be read
3330 status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3334 data = utils.ReadFile(status_file)
3335 except EnvironmentError, err:
3336 if err.errno != errno.ENOENT:
3344 result.append(serializer.LoadJson(data))
3349 def AbortImportExport(name):
3350 """Sends SIGTERM to a running import/export daemon.
3353 logging.info("Abort import/export %s", name)
3355 status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3356 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3359 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3361 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3364 def CleanupImportExport(name):
3365 """Cleanup after an import or export.
3367 If the import/export daemon is still running it's killed. Afterwards the
3368 whole status directory is removed.
3371 logging.info("Finalizing import/export %s", name)
3373 status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3375 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3378 logging.info("Import/export %s is still running with PID %s",
3380 utils.KillProcess(pid, waitpid=False)
3382 shutil.rmtree(status_dir, ignore_errors=True)
3385 def _FindDisks(nodes_ip, disks):
3386 """Sets the physical ID on disks and returns the block devices.
3389 # set the correct physical ID
3390 my_name = netutils.Hostname.GetSysName()
3392 cf.SetPhysicalID(my_name, nodes_ip)
3397 rd = _RecursiveFindBD(cf)
3399 _Fail("Can't find device %s", cf)
3404 def DrbdDisconnectNet(nodes_ip, disks):
3405 """Disconnects the network on a list of drbd devices.
3408 bdevs = _FindDisks(nodes_ip, disks)
3414 except errors.BlockDeviceError, err:
3415 _Fail("Can't change network configuration to standalone mode: %s",
3419 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3420 """Attaches the network on a list of drbd devices.
3423 bdevs = _FindDisks(nodes_ip, disks)
3426 for idx, rd in enumerate(bdevs):
3428 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3429 except EnvironmentError, err:
3430 _Fail("Can't create symlink: %s", err)
3431 # reconnect disks, switch to new master configuration and if
3432 # needed primary mode
3435 rd.AttachNet(multimaster)
3436 except errors.BlockDeviceError, err:
3437 _Fail("Can't change network configuration: %s", err)
3439 # wait until the disks are connected; we need to retry the re-attach
3440 # if the device becomes standalone, as this might happen if the one
3441 # node disconnects and reconnects in a different mode before the
3442 # other node reconnects; in this case, one or both of the nodes will
3443 # decide it has wrong configuration and switch to standalone
3446 all_connected = True
3449 stats = rd.GetProcStatus()
3451 all_connected = (all_connected and
3452 (stats.is_connected or stats.is_in_resync))
3454 if stats.is_standalone:
3455 # peer had different config info and this node became
3456 # standalone, even though this should not happen with the
3457 # new staged way of changing disk configs
3459 rd.AttachNet(multimaster)
3460 except errors.BlockDeviceError, err:
3461 _Fail("Can't change network configuration: %s", err)
3463 if not all_connected:
3464 raise utils.RetryAgain()
3467 # Start with a delay of 100 miliseconds and go up to 5 seconds
3468 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3469 except utils.RetryTimeout:
3470 _Fail("Timeout in disk reconnecting")
3473 # change to primary mode
3477 except errors.BlockDeviceError, err:
3478 _Fail("Can't change to primary mode: %s", err)
3481 def DrbdWaitSync(nodes_ip, disks):
3482 """Wait until DRBDs have synchronized.
3486 stats = rd.GetProcStatus()
3487 if not (stats.is_connected or stats.is_in_resync):
3488 raise utils.RetryAgain()
3491 bdevs = _FindDisks(nodes_ip, disks)
3497 # poll each second for 15 seconds
3498 stats = utils.Retry(_helper, 1, 15, args=[rd])
3499 except utils.RetryTimeout:
3500 stats = rd.GetProcStatus()
3502 if not (stats.is_connected or stats.is_in_resync):
3503 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3504 alldone = alldone and (not stats.is_in_resync)
3505 if stats.sync_percent is not None:
3506 min_resync = min(min_resync, stats.sync_percent)
3508 return (alldone, min_resync)
3511 def GetDrbdUsermodeHelper():
3512 """Returns DRBD usermode helper currently configured.
3516 return bdev.BaseDRBD.GetUsermodeHelper()
3517 except errors.BlockDeviceError, err:
3521 def PowercycleNode(hypervisor_type):
3522 """Hard-powercycle the node.
3524 Because we need to return first, and schedule the powercycle in the
3525 background, we won't be able to report failures nicely.
3528 hyper = hypervisor.GetHypervisor(hypervisor_type)
3532 # if we can't fork, we'll pretend that we're in the child process
3535 return "Reboot scheduled in 5 seconds"
3536 # ensure the child is running on ram
3539 except Exception: # pylint: disable=W0703
3542 hyper.PowercycleNode()
3545 class HooksRunner(object):
3548 This class is instantiated on the node side (ganeti-noded) and not
3552 def __init__(self, hooks_base_dir=None):
3553 """Constructor for hooks runner.
3555 @type hooks_base_dir: str or None
3556 @param hooks_base_dir: if not None, this overrides the
3557 L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3560 if hooks_base_dir is None:
3561 hooks_base_dir = pathutils.HOOKS_BASE_DIR
3562 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3564 self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3566 def RunLocalHooks(self, node_list, hpath, phase, env):
3567 """Check that the hooks will be run only locally and then run them.
3570 assert len(node_list) == 1
3572 _, myself = ssconf.GetMasterAndMyself()
3573 assert node == myself
3575 results = self.RunHooks(hpath, phase, env)
3577 # Return values in the form expected by HooksMaster
3578 return {node: (None, False, results)}
3580 def RunHooks(self, hpath, phase, env):
3581 """Run the scripts in the hooks directory.
3584 @param hpath: the path to the hooks directory which
3587 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3588 L{constants.HOOKS_PHASE_POST}
3590 @param env: dictionary with the environment for the hook
3592 @return: list of 3-element tuples:
3594 - script result, either L{constants.HKR_SUCCESS} or
3595 L{constants.HKR_FAIL}
3596 - output of the script
3598 @raise errors.ProgrammerError: for invalid input
3602 if phase == constants.HOOKS_PHASE_PRE:
3604 elif phase == constants.HOOKS_PHASE_POST:
3607 _Fail("Unknown hooks phase '%s'", phase)
3609 subdir = "%s-%s.d" % (hpath, suffix)
3610 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3614 if not os.path.isdir(dir_name):
3615 # for non-existing/non-dirs, we simply exit instead of logging a
3616 # warning at every operation
3619 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3621 for (relname, relstatus, runresult) in runparts_results:
3622 if relstatus == constants.RUNPARTS_SKIP:
3623 rrval = constants.HKR_SKIP
3625 elif relstatus == constants.RUNPARTS_ERR:
3626 rrval = constants.HKR_FAIL
3627 output = "Hook script execution error: %s" % runresult
3628 elif relstatus == constants.RUNPARTS_RUN:
3629 if runresult.failed:
3630 rrval = constants.HKR_FAIL
3632 rrval = constants.HKR_SUCCESS
3633 output = utils.SafeEncode(runresult.output.strip())
3634 results.append(("%s/%s" % (subdir, relname), rrval, output))
3639 class IAllocatorRunner(object):
3640 """IAllocator runner.
3642 This class is instantiated on the node side (ganeti-noded) and not on
3647 def Run(name, idata):
3648 """Run an iallocator script.
3651 @param name: the iallocator script name
3653 @param idata: the allocator input data
3656 @return: two element tuple of:
3658 - either error message or stdout of allocator (for success)
3661 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3663 if alloc_script is None:
3664 _Fail("iallocator module '%s' not found in the search path", name)
3666 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3670 result = utils.RunCmd([alloc_script, fin_name])
3672 _Fail("iallocator module '%s' failed: %s, output '%s'",
3673 name, result.fail_reason, result.output)
3677 return result.stdout
3680 class DevCacheManager(object):
3681 """Simple class for managing a cache of block device information.
3684 _DEV_PREFIX = "/dev/"
3685 _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3688 def _ConvertPath(cls, dev_path):
3689 """Converts a /dev/name path to the cache file name.
3691 This replaces slashes with underscores and strips the /dev
3692 prefix. It then returns the full path to the cache file.
3695 @param dev_path: the C{/dev/} path name
3697 @return: the converted path name
3700 if dev_path.startswith(cls._DEV_PREFIX):
3701 dev_path = dev_path[len(cls._DEV_PREFIX):]
3702 dev_path = dev_path.replace("/", "_")
3703 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3707 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3708 """Updates the cache information for a given device.
3711 @param dev_path: the pathname of the device
3713 @param owner: the owner (instance name) of the device
3714 @type on_primary: bool
3715 @param on_primary: whether this is the primary
3718 @param iv_name: the instance-visible name of the
3719 device, as in objects.Disk.iv_name
3724 if dev_path is None:
3725 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3727 fpath = cls._ConvertPath(dev_path)
3733 iv_name = "not_visible"
3734 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3736 utils.WriteFile(fpath, data=fdata)
3737 except EnvironmentError, err:
3738 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3741 def RemoveCache(cls, dev_path):
3742 """Remove data for a dev_path.
3744 This is just a wrapper over L{utils.io.RemoveFile} with a converted
3745 path name and logging.
3748 @param dev_path: the pathname of the device
3753 if dev_path is None:
3754 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3756 fpath = cls._ConvertPath(dev_path)
3758 utils.RemoveFile(fpath)
3759 except EnvironmentError, err:
3760 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)