4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63 from ganeti import mcpu
64 from ganeti import compat
65 from ganeti import pathutils
66 from ganeti import vcluster
69 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70 _ALLOWED_CLEAN_DIRS = frozenset([
72 pathutils.JOB_QUEUE_ARCHIVE_DIR,
74 pathutils.CRYPTO_KEYS_DIR,
76 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77 _X509_KEY_FILE = "key"
78 _X509_CERT_FILE = "cert"
79 _IES_STATUS_FILE = "status"
83 #: Valid LVS output line regex
84 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
86 # Actions for the master setup script
87 _MASTER_START = "start"
90 #: Maximum file permissions for remote command directory and executables
91 _RCMD_MAX_MODE = (stat.S_IRWXU |
92 stat.S_IRGRP | stat.S_IXGRP |
93 stat.S_IROTH | stat.S_IXOTH)
95 #: Delay before returning an error for remote commands
96 _RCMD_INVALID_DELAY = 10
98 #: How long to wait to acquire lock for remote commands (shorter than
99 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
100 #: command requests arrive
101 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
104 class RPCFail(Exception):
105 """Class denoting RPC failure.
107 Its argument is the error message.
112 def _Fail(msg, *args, **kwargs):
113 """Log an error and the raise an RPCFail exception.
115 This exception is then handled specially in the ganeti daemon and
116 turned into a 'failed' return type. As such, this function is a
117 useful shortcut for logging the error and returning it to the master
121 @param msg: the text of the exception
127 if "log" not in kwargs or kwargs["log"]: # if we should log this error
128 if "exc" in kwargs and kwargs["exc"]:
129 logging.exception(msg)
136 """Simple wrapper to return a SimpleStore.
138 @rtype: L{ssconf.SimpleStore}
139 @return: a SimpleStore instance
142 return ssconf.SimpleStore()
145 def _GetSshRunner(cluster_name):
146 """Simple wrapper to return an SshRunner.
148 @type cluster_name: str
149 @param cluster_name: the cluster name, which is needed
150 by the SshRunner constructor
151 @rtype: L{ssh.SshRunner}
152 @return: an SshRunner instance
155 return ssh.SshRunner(cluster_name)
158 def _Decompress(data):
159 """Unpacks data compressed by the RPC client.
161 @type data: list or tuple
162 @param data: Data sent by RPC client
164 @return: Decompressed data
167 assert isinstance(data, (list, tuple))
168 assert len(data) == 2
169 (encoding, content) = data
170 if encoding == constants.RPC_ENCODING_NONE:
172 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
173 return zlib.decompress(base64.b64decode(content))
175 raise AssertionError("Unknown data encoding")
178 def _CleanDirectory(path, exclude=None):
179 """Removes all regular files in a directory.
182 @param path: the directory to clean
184 @param exclude: list of files to be excluded, defaults
188 if path not in _ALLOWED_CLEAN_DIRS:
189 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
192 if not os.path.isdir(path):
197 # Normalize excluded paths
198 exclude = [os.path.normpath(i) for i in exclude]
200 for rel_name in utils.ListVisibleFiles(path):
201 full_name = utils.PathJoin(path, rel_name)
202 if full_name in exclude:
204 if os.path.isfile(full_name) and not os.path.islink(full_name):
205 utils.RemoveFile(full_name)
208 def _BuildUploadFileList():
209 """Build the list of allowed upload files.
211 This is abstracted so that it's built only once at module import time.
214 allowed_files = set([
215 pathutils.CLUSTER_CONF_FILE,
217 pathutils.SSH_KNOWN_HOSTS_FILE,
218 pathutils.VNC_PASSWORD_FILE,
219 pathutils.RAPI_CERT_FILE,
220 pathutils.SPICE_CERT_FILE,
221 pathutils.SPICE_CACERT_FILE,
222 pathutils.RAPI_USERS_FILE,
223 pathutils.CONFD_HMAC_KEY,
224 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
227 for hv_name in constants.HYPER_TYPES:
228 hv_class = hypervisor.GetHypervisorClass(hv_name)
229 allowed_files.update(hv_class.GetAncillaryFiles()[0])
231 assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
232 "Allowed file storage paths should never be uploaded via RPC"
234 return frozenset(allowed_files)
237 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
241 """Removes job queue files and archived jobs.
247 _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
248 _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
252 """Returns master information.
254 This is an utility function to compute master information, either
255 for consumption here or from the node daemon.
258 @return: master_netdev, master_ip, master_name, primary_ip_family,
260 @raise RPCFail: in case of errors
265 master_netdev = cfg.GetMasterNetdev()
266 master_ip = cfg.GetMasterIP()
267 master_netmask = cfg.GetMasterNetmask()
268 master_node = cfg.GetMasterNode()
269 primary_ip_family = cfg.GetPrimaryIPFamily()
270 except errors.ConfigurationError, err:
271 _Fail("Cluster configuration incomplete: %s", err, exc=True)
272 return (master_netdev, master_ip, master_node, primary_ip_family,
276 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
277 """Decorator that runs hooks before and after the decorated function.
279 @type hook_opcode: string
280 @param hook_opcode: opcode of the hook
281 @type hooks_path: string
282 @param hooks_path: path of the hooks
283 @type env_builder_fn: function
284 @param env_builder_fn: function that returns a dictionary containing the
285 environment variables for the hooks. Will get all the parameters of the
287 @raise RPCFail: in case of pre-hook failure
291 def wrapper(*args, **kwargs):
292 _, myself = ssconf.GetMasterAndMyself()
293 nodes = ([myself], [myself]) # these hooks run locally
295 env_fn = compat.partial(env_builder_fn, *args, **kwargs)
299 hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
300 None, env_fn, logging.warning, cfg.GetClusterName(),
303 hm.RunPhase(constants.HOOKS_PHASE_PRE)
304 result = fn(*args, **kwargs)
305 hm.RunPhase(constants.HOOKS_PHASE_POST)
312 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
313 """Builds environment variables for master IP hooks.
315 @type master_params: L{objects.MasterNetworkParameters}
316 @param master_params: network parameters of the master
317 @type use_external_mip_script: boolean
318 @param use_external_mip_script: whether to use an external master IP
319 address setup script (unused, but necessary per the implementation of the
320 _RunLocalHooks decorator)
323 # pylint: disable=W0613
324 ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
326 "MASTER_NETDEV": master_params.netdev,
327 "MASTER_IP": master_params.ip,
328 "MASTER_NETMASK": str(master_params.netmask),
329 "CLUSTER_IP_VERSION": str(ver),
335 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
336 """Execute the master IP address setup script.
338 @type master_params: L{objects.MasterNetworkParameters}
339 @param master_params: network parameters of the master
341 @param action: action to pass to the script. Must be one of
342 L{backend._MASTER_START} or L{backend._MASTER_STOP}
343 @type use_external_mip_script: boolean
344 @param use_external_mip_script: whether to use an external master IP
346 @raise backend.RPCFail: if there are errors during the execution of the
350 env = _BuildMasterIpEnv(master_params)
352 if use_external_mip_script:
353 setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
355 setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
357 result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
360 _Fail("Failed to %s the master IP. Script return value: %s" %
361 (action, result.exit_code), log=True)
364 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
366 def ActivateMasterIp(master_params, use_external_mip_script):
367 """Activate the IP address of the master daemon.
369 @type master_params: L{objects.MasterNetworkParameters}
370 @param master_params: network parameters of the master
371 @type use_external_mip_script: boolean
372 @param use_external_mip_script: whether to use an external master IP
374 @raise RPCFail: in case of errors during the IP startup
377 _RunMasterSetupScript(master_params, _MASTER_START,
378 use_external_mip_script)
381 def StartMasterDaemons(no_voting):
382 """Activate local node as master node.
384 The function will start the master daemons (ganeti-masterd and ganeti-rapi).
386 @type no_voting: boolean
387 @param no_voting: whether to start ganeti-masterd without a node vote
388 but still non-interactively
394 masterd_args = "--no-voting --yes-do-it"
399 "EXTRA_MASTERD_ARGS": masterd_args,
402 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
404 msg = "Can't start Ganeti master: %s" % result.output
409 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
411 def DeactivateMasterIp(master_params, use_external_mip_script):
412 """Deactivate the master IP on this node.
414 @type master_params: L{objects.MasterNetworkParameters}
415 @param master_params: network parameters of the master
416 @type use_external_mip_script: boolean
417 @param use_external_mip_script: whether to use an external master IP
419 @raise RPCFail: in case of errors during the IP turndown
422 _RunMasterSetupScript(master_params, _MASTER_STOP,
423 use_external_mip_script)
426 def StopMasterDaemons():
427 """Stop the master daemons on this node.
429 Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
434 # TODO: log and report back to the caller the error failures; we
435 # need to decide in which case we fail the RPC for this
437 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
439 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
441 result.cmd, result.exit_code, result.output)
444 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
445 """Change the netmask of the master IP.
447 @param old_netmask: the old value of the netmask
448 @param netmask: the new value of the netmask
449 @param master_ip: the master IP
450 @param master_netdev: the master network device
453 if old_netmask == netmask:
456 if not netutils.IPAddress.Own(master_ip):
457 _Fail("The master IP address is not up, not attempting to change its"
460 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
461 "%s/%s" % (master_ip, netmask),
462 "dev", master_netdev, "label",
463 "%s:0" % master_netdev])
465 _Fail("Could not set the new netmask on the master IP address")
467 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
468 "%s/%s" % (master_ip, old_netmask),
469 "dev", master_netdev, "label",
470 "%s:0" % master_netdev])
472 _Fail("Could not bring down the master IP address with the old netmask")
475 def EtcHostsModify(mode, host, ip):
476 """Modify a host entry in /etc/hosts.
478 @param mode: The mode to operate. Either add or remove entry
479 @param host: The host to operate on
480 @param ip: The ip associated with the entry
483 if mode == constants.ETC_HOSTS_ADD:
485 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
487 utils.AddHostToEtcHosts(host, ip)
488 elif mode == constants.ETC_HOSTS_REMOVE:
490 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
491 " parameter is present")
492 utils.RemoveHostFromEtcHosts(host)
494 RPCFail("Mode not supported")
497 def LeaveCluster(modify_ssh_setup):
498 """Cleans up and remove the current node.
500 This function cleans up and prepares the current node to be removed
503 If processing is successful, then it raises an
504 L{errors.QuitGanetiException} which is used as a special case to
505 shutdown the node daemon.
507 @param modify_ssh_setup: boolean
510 _CleanDirectory(pathutils.DATA_DIR)
511 _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
516 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
518 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
520 utils.RemoveFile(priv_key)
521 utils.RemoveFile(pub_key)
522 except errors.OpExecError:
523 logging.exception("Error while processing ssh files")
526 utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
527 utils.RemoveFile(pathutils.RAPI_CERT_FILE)
528 utils.RemoveFile(pathutils.SPICE_CERT_FILE)
529 utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
530 utils.RemoveFile(pathutils.NODED_CERT_FILE)
531 except: # pylint: disable=W0702
532 logging.exception("Error while removing cluster secrets")
534 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
536 logging.error("Command %s failed with exitcode %s and error %s",
537 result.cmd, result.exit_code, result.output)
539 # Raise a custom exception (handled in ganeti-noded)
540 raise errors.QuitGanetiException(True, "Shutdown scheduled")
543 def _GetVgInfo(name):
544 """Retrieves information about a LVM volume group.
547 # TODO: GetVGInfo supports returning information for multiple VGs at once
548 vginfo = bdev.LogicalVolume.GetVGInfo([name])
550 vg_free = int(round(vginfo[0][0], 0))
551 vg_size = int(round(vginfo[0][1], 0))
563 def _GetHvInfo(name):
564 """Retrieves node information from a hypervisor.
566 The information returned depends on the hypervisor. Common items:
568 - vg_size is the size of the configured volume group in MiB
569 - vg_free is the free size of the volume group in MiB
570 - memory_dom0 is the memory allocated for domain0 in MiB
571 - memory_free is the currently available (free) ram in MiB
572 - memory_total is the total number of ram in MiB
573 - hv_version: the hypervisor version, if available
576 return hypervisor.GetHypervisor(name).GetNodeInfo()
579 def _GetNamedNodeInfo(names, fn):
580 """Calls C{fn} for all names in C{names} and returns a dictionary.
588 return map(fn, names)
591 def GetNodeInfo(vg_names, hv_names):
592 """Gives back a hash with different information about the node.
594 @type vg_names: list of string
595 @param vg_names: Names of the volume groups to ask for disk space information
596 @type hv_names: list of string
597 @param hv_names: Names of the hypervisors to ask for node information
598 @rtype: tuple; (string, None/dict, None/dict)
599 @return: Tuple containing boot ID, volume group information and hypervisor
603 bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
604 vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
605 hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
607 return (bootid, vg_info, hv_info)
610 def VerifyNode(what, cluster_name):
611 """Verify the status of the local node.
613 Based on the input L{what} parameter, various checks are done on the
616 If the I{filelist} key is present, this list of
617 files is checksummed and the file/checksum pairs are returned.
619 If the I{nodelist} key is present, we check that we have
620 connectivity via ssh with the target nodes (and check the hostname
623 If the I{node-net-test} key is present, we check that we have
624 connectivity to the given nodes via both primary IP and, if
625 applicable, secondary IPs.
628 @param what: a dictionary of things to check:
629 - filelist: list of files for which to compute checksums
630 - nodelist: list of nodes we should check ssh communication with
631 - node-net-test: list of nodes we should check node daemon port
633 - hypervisor: list with hypervisors to run the verify for
635 @return: a dictionary with the same keys as the input dict, and
636 values representing the result of the checks
640 my_name = netutils.Hostname.GetSysName()
641 port = netutils.GetDaemonPort(constants.NODED)
642 vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
644 if constants.NV_HYPERVISOR in what and vm_capable:
645 result[constants.NV_HYPERVISOR] = tmp = {}
646 for hv_name in what[constants.NV_HYPERVISOR]:
648 val = hypervisor.GetHypervisor(hv_name).Verify()
649 except errors.HypervisorError, err:
650 val = "Error while checking hypervisor: %s" % str(err)
653 if constants.NV_HVPARAMS in what and vm_capable:
654 result[constants.NV_HVPARAMS] = tmp = []
655 for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
657 logging.info("Validating hv %s, %s", hv_name, hvparms)
658 hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
659 except errors.HypervisorError, err:
660 tmp.append((source, hv_name, str(err)))
662 if constants.NV_FILELIST in what:
663 fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
664 what[constants.NV_FILELIST]))
665 result[constants.NV_FILELIST] = \
666 dict((vcluster.MakeVirtualPath(key), value)
667 for (key, value) in fingerprints.items())
669 if constants.NV_NODELIST in what:
670 (nodes, bynode) = what[constants.NV_NODELIST]
672 # Add nodes from other groups (different for each node)
674 nodes.extend(bynode[my_name])
679 random.shuffle(nodes)
681 # Try to contact all nodes
684 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
688 result[constants.NV_NODELIST] = val
690 if constants.NV_NODENETTEST in what:
691 result[constants.NV_NODENETTEST] = tmp = {}
692 my_pip = my_sip = None
693 for name, pip, sip in what[constants.NV_NODENETTEST]:
699 tmp[my_name] = ("Can't find my own primary/secondary IP"
702 for name, pip, sip in what[constants.NV_NODENETTEST]:
704 if not netutils.TcpPing(pip, port, source=my_pip):
705 fail.append("primary")
707 if not netutils.TcpPing(sip, port, source=my_sip):
708 fail.append("secondary")
710 tmp[name] = ("failure using the %s interface(s)" %
713 if constants.NV_MASTERIP in what:
714 # FIXME: add checks on incoming data structures (here and in the
715 # rest of the function)
716 master_name, master_ip = what[constants.NV_MASTERIP]
717 if master_name == my_name:
718 source = constants.IP4_ADDRESS_LOCALHOST
721 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
724 if constants.NV_USERSCRIPTS in what:
725 result[constants.NV_USERSCRIPTS] = \
726 [script for script in what[constants.NV_USERSCRIPTS]
727 if not utils.IsExecutable(script)]
729 if constants.NV_OOB_PATHS in what:
730 result[constants.NV_OOB_PATHS] = tmp = []
731 for path in what[constants.NV_OOB_PATHS]:
735 tmp.append("error stating out of band helper: %s" % err)
737 if stat.S_ISREG(st.st_mode):
738 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
741 tmp.append("out of band helper %s is not executable" % path)
743 tmp.append("out of band helper %s is not a file" % path)
745 if constants.NV_LVLIST in what and vm_capable:
747 val = GetVolumeList(utils.ListVolumeGroups().keys())
750 result[constants.NV_LVLIST] = val
752 if constants.NV_INSTANCELIST in what and vm_capable:
753 # GetInstanceList can fail
755 val = GetInstanceList(what[constants.NV_INSTANCELIST])
758 result[constants.NV_INSTANCELIST] = val
760 if constants.NV_VGLIST in what and vm_capable:
761 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
763 if constants.NV_PVLIST in what and vm_capable:
764 result[constants.NV_PVLIST] = \
765 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
766 filter_allocatable=False)
768 if constants.NV_VERSION in what:
769 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
770 constants.RELEASE_VERSION)
772 if constants.NV_HVINFO in what and vm_capable:
773 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
774 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
776 if constants.NV_DRBDLIST in what and vm_capable:
778 used_minors = bdev.DRBD8.GetUsedDevs().keys()
779 except errors.BlockDeviceError, err:
780 logging.warning("Can't get used minors list", exc_info=True)
781 used_minors = str(err)
782 result[constants.NV_DRBDLIST] = used_minors
784 if constants.NV_DRBDHELPER in what and vm_capable:
787 payload = bdev.BaseDRBD.GetUsermodeHelper()
788 except errors.BlockDeviceError, err:
789 logging.error("Can't get DRBD usermode helper: %s", str(err))
792 result[constants.NV_DRBDHELPER] = (status, payload)
794 if constants.NV_NODESETUP in what:
795 result[constants.NV_NODESETUP] = tmpr = []
796 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
797 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
798 " under /sys, missing required directories /sys/block"
799 " and /sys/class/net")
800 if (not os.path.isdir("/proc/sys") or
801 not os.path.isfile("/proc/sysrq-trigger")):
802 tmpr.append("The procfs filesystem doesn't seem to be mounted"
803 " under /proc, missing required directory /proc/sys and"
804 " the file /proc/sysrq-trigger")
806 if constants.NV_TIME in what:
807 result[constants.NV_TIME] = utils.SplitTime(time.time())
809 if constants.NV_OSLIST in what and vm_capable:
810 result[constants.NV_OSLIST] = DiagnoseOS()
812 if constants.NV_BRIDGES in what and vm_capable:
813 result[constants.NV_BRIDGES] = [bridge
814 for bridge in what[constants.NV_BRIDGES]
815 if not utils.BridgeExists(bridge)]
817 if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
818 result[constants.NV_FILE_STORAGE_PATHS] = \
819 bdev.ComputeWrongFileStoragePaths()
824 def GetBlockDevSizes(devices):
825 """Return the size of the given block devices
828 @param devices: list of block device nodes to query
831 dictionary of all block devices under /dev (key). The value is their
834 {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
840 for devpath in devices:
841 if not utils.IsBelowDir(DEV_PREFIX, devpath):
845 st = os.stat(devpath)
846 except EnvironmentError, err:
847 logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
850 if stat.S_ISBLK(st.st_mode):
851 result = utils.RunCmd(["blockdev", "--getsize64", devpath])
853 # We don't want to fail, just do not list this device as available
854 logging.warning("Cannot get size for block device %s", devpath)
857 size = int(result.stdout) / (1024 * 1024)
858 blockdevs[devpath] = size
862 def GetVolumeList(vg_names):
863 """Compute list of logical volumes and their size.
866 @param vg_names: the volume groups whose LVs we should list, or
867 empty for all volume groups
870 dictionary of all partions (key) with value being a tuple of
871 their size (in MiB), inactive and online status::
873 {'xenvg/test1': ('20.06', True, True)}
875 in case of errors, a string is returned with the error
883 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
884 "--separator=%s" % sep,
885 "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
887 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
889 for line in result.stdout.splitlines():
891 match = _LVSLINE_REGEX.match(line)
893 logging.error("Invalid line returned from lvs output: '%s'", line)
895 vg_name, name, size, attr = match.groups()
896 inactive = attr[4] == "-"
897 online = attr[5] == "o"
898 virtual = attr[0] == "v"
900 # we don't want to report such volumes as existing, since they
901 # don't really hold data
903 lvs[vg_name + "/" + name] = (size, inactive, online)
908 def ListVolumeGroups():
909 """List the volume groups and their size.
912 @return: dictionary with keys volume name and values the
916 return utils.ListVolumeGroups()
920 """List all volumes on this node.
924 A list of dictionaries, each having four keys:
925 - name: the logical volume name,
926 - size: the size of the logical volume
927 - dev: the physical device on which the LV lives
928 - vg: the volume group to which it belongs
930 In case of errors, we return an empty list and log the
933 Note that since a logical volume can live on multiple physical
934 volumes, the resulting list might include a logical volume
938 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
940 "--options=lv_name,lv_size,devices,vg_name"])
942 _Fail("Failed to list logical volumes, lvs output: %s",
946 return dev.split("(")[0]
949 return [parse_dev(x) for x in dev.split(",")]
952 line = [v.strip() for v in line]
953 return [{"name": line[0], "size": line[1],
954 "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
957 for line in result.stdout.splitlines():
958 if line.count("|") >= 3:
959 all_devs.extend(map_line(line.split("|")))
961 logging.warning("Strange line in the output from lvs: '%s'", line)
965 def BridgesExist(bridges_list):
966 """Check if a list of bridges exist on the current node.
969 @return: C{True} if all of them exist, C{False} otherwise
973 for bridge in bridges_list:
974 if not utils.BridgeExists(bridge):
975 missing.append(bridge)
978 _Fail("Missing bridges %s", utils.CommaJoin(missing))
981 def GetInstanceList(hypervisor_list):
982 """Provides a list of instances.
984 @type hypervisor_list: list
985 @param hypervisor_list: the list of hypervisors to query information
988 @return: a list of all running instances on the current node
989 - instance1.example.com
990 - instance2.example.com
994 for hname in hypervisor_list:
996 names = hypervisor.GetHypervisor(hname).ListInstances()
997 results.extend(names)
998 except errors.HypervisorError, err:
999 _Fail("Error enumerating instances (hypervisor %s): %s",
1000 hname, err, exc=True)
1005 def GetInstanceInfo(instance, hname):
1006 """Gives back the information about an instance as a dictionary.
1008 @type instance: string
1009 @param instance: the instance name
1011 @param hname: the hypervisor type of the instance
1014 @return: dictionary with the following keys:
1015 - memory: memory size of instance (int)
1016 - state: xen state of instance (string)
1017 - time: cpu time of instance (float)
1018 - vcpus: the number of vcpus (int)
1023 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1024 if iinfo is not None:
1025 output["memory"] = iinfo[2]
1026 output["vcpus"] = iinfo[3]
1027 output["state"] = iinfo[4]
1028 output["time"] = iinfo[5]
1033 def GetInstanceMigratable(instance):
1034 """Gives whether an instance can be migrated.
1036 @type instance: L{objects.Instance}
1037 @param instance: object representing the instance to be checked.
1040 @return: tuple of (result, description) where:
1041 - result: whether the instance can be migrated or not
1042 - description: a description of the issue, if relevant
1045 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1046 iname = instance.name
1047 if iname not in hyper.ListInstances():
1048 _Fail("Instance %s is not running", iname)
1050 for idx in range(len(instance.disks)):
1051 link_name = _GetBlockDevSymlinkPath(iname, idx)
1052 if not os.path.islink(link_name):
1053 logging.warning("Instance %s is missing symlink %s for disk %d",
1054 iname, link_name, idx)
1057 def GetAllInstancesInfo(hypervisor_list):
1058 """Gather data about all instances.
1060 This is the equivalent of L{GetInstanceInfo}, except that it
1061 computes data for all instances at once, thus being faster if one
1062 needs data about more than one instance.
1064 @type hypervisor_list: list
1065 @param hypervisor_list: list of hypervisors to query for instance data
1068 @return: dictionary of instance: data, with data having the following keys:
1069 - memory: memory size of instance (int)
1070 - state: xen state of instance (string)
1071 - time: cpu time of instance (float)
1072 - vcpus: the number of vcpus
1077 for hname in hypervisor_list:
1078 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1080 for name, _, memory, vcpus, state, times in iinfo:
1088 # we only check static parameters, like memory and vcpus,
1089 # and not state and time which can change between the
1090 # invocations of the different hypervisors
1091 for key in "memory", "vcpus":
1092 if value[key] != output[name][key]:
1093 _Fail("Instance %s is running twice"
1094 " with different parameters", name)
1095 output[name] = value
1100 def _InstanceLogName(kind, os_name, instance, component):
1101 """Compute the OS log filename for a given instance and operation.
1103 The instance name and os name are passed in as strings since not all
1104 operations have these as part of an instance object.
1107 @param kind: the operation type (e.g. add, import, etc.)
1108 @type os_name: string
1109 @param os_name: the os name
1110 @type instance: string
1111 @param instance: the name of the instance being imported/added/etc.
1112 @type component: string or None
1113 @param component: the name of the component of the instance being
1117 # TODO: Use tempfile.mkstemp to create unique filename
1119 assert "/" not in component
1120 c_msg = "-%s" % component
1123 base = ("%s-%s-%s%s-%s.log" %
1124 (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1125 return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1128 def InstanceOsAdd(instance, reinstall, debug):
1129 """Add an OS to an instance.
1131 @type instance: L{objects.Instance}
1132 @param instance: Instance whose OS is to be installed
1133 @type reinstall: boolean
1134 @param reinstall: whether this is an instance reinstall
1135 @type debug: integer
1136 @param debug: debug level, passed to the OS scripts
1140 inst_os = OSFromDisk(instance.os)
1142 create_env = OSEnvironment(instance, inst_os, debug)
1144 create_env["INSTANCE_REINSTALL"] = "1"
1146 logfile = _InstanceLogName("add", instance.os, instance.name, None)
1148 result = utils.RunCmd([inst_os.create_script], env=create_env,
1149 cwd=inst_os.path, output=logfile, reset_env=True)
1151 logging.error("os create command '%s' returned error: %s, logfile: %s,"
1152 " output: %s", result.cmd, result.fail_reason, logfile,
1154 lines = [utils.SafeEncode(val)
1155 for val in utils.TailFile(logfile, lines=20)]
1156 _Fail("OS create script failed (%s), last lines in the"
1157 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1160 def RunRenameInstance(instance, old_name, debug):
1161 """Run the OS rename script for an instance.
1163 @type instance: L{objects.Instance}
1164 @param instance: Instance whose OS is to be installed
1165 @type old_name: string
1166 @param old_name: previous instance name
1167 @type debug: integer
1168 @param debug: debug level, passed to the OS scripts
1170 @return: the success of the operation
1173 inst_os = OSFromDisk(instance.os)
1175 rename_env = OSEnvironment(instance, inst_os, debug)
1176 rename_env["OLD_INSTANCE_NAME"] = old_name
1178 logfile = _InstanceLogName("rename", instance.os,
1179 "%s-%s" % (old_name, instance.name), None)
1181 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1182 cwd=inst_os.path, output=logfile, reset_env=True)
1185 logging.error("os create command '%s' returned error: %s output: %s",
1186 result.cmd, result.fail_reason, result.output)
1187 lines = [utils.SafeEncode(val)
1188 for val in utils.TailFile(logfile, lines=20)]
1189 _Fail("OS rename script failed (%s), last lines in the"
1190 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1193 def _GetBlockDevSymlinkPath(instance_name, idx):
1194 return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1195 (instance_name, constants.DISK_SEPARATOR, idx))
1198 def _SymlinkBlockDev(instance_name, device_path, idx):
1199 """Set up symlinks to a instance's block device.
1201 This is an auxiliary function run when an instance is start (on the primary
1202 node) or when an instance is migrated (on the target node).
1205 @param instance_name: the name of the target instance
1206 @param device_path: path of the physical block device, on the node
1207 @param idx: the disk index
1208 @return: absolute path to the disk's symlink
1211 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1213 os.symlink(device_path, link_name)
1214 except OSError, err:
1215 if err.errno == errno.EEXIST:
1216 if (not os.path.islink(link_name) or
1217 os.readlink(link_name) != device_path):
1218 os.remove(link_name)
1219 os.symlink(device_path, link_name)
1226 def _RemoveBlockDevLinks(instance_name, disks):
1227 """Remove the block device symlinks belonging to the given instance.
1230 for idx, _ in enumerate(disks):
1231 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1232 if os.path.islink(link_name):
1234 os.remove(link_name)
1236 logging.exception("Can't remove symlink '%s'", link_name)
1239 def _GatherAndLinkBlockDevs(instance):
1240 """Set up an instance's block device(s).
1242 This is run on the primary node at instance startup. The block
1243 devices must be already assembled.
1245 @type instance: L{objects.Instance}
1246 @param instance: the instance whose disks we shoul assemble
1248 @return: list of (disk_object, device_path)
1252 for idx, disk in enumerate(instance.disks):
1253 device = _RecursiveFindBD(disk)
1255 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1259 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1261 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1264 block_devices.append((disk, link_name))
1266 return block_devices
1269 def StartInstance(instance, startup_paused):
1270 """Start an instance.
1272 @type instance: L{objects.Instance}
1273 @param instance: the instance object
1274 @type startup_paused: bool
1275 @param instance: pause instance at startup?
1279 running_instances = GetInstanceList([instance.hypervisor])
1281 if instance.name in running_instances:
1282 logging.info("Instance %s already running, not starting", instance.name)
1286 block_devices = _GatherAndLinkBlockDevs(instance)
1287 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1288 hyper.StartInstance(instance, block_devices, startup_paused)
1289 except errors.BlockDeviceError, err:
1290 _Fail("Block device error: %s", err, exc=True)
1291 except errors.HypervisorError, err:
1292 _RemoveBlockDevLinks(instance.name, instance.disks)
1293 _Fail("Hypervisor error: %s", err, exc=True)
1296 def InstanceShutdown(instance, timeout):
1297 """Shut an instance down.
1299 @note: this functions uses polling with a hardcoded timeout.
1301 @type instance: L{objects.Instance}
1302 @param instance: the instance object
1303 @type timeout: integer
1304 @param timeout: maximum timeout for soft shutdown
1308 hv_name = instance.hypervisor
1309 hyper = hypervisor.GetHypervisor(hv_name)
1310 iname = instance.name
1312 if instance.name not in hyper.ListInstances():
1313 logging.info("Instance %s not running, doing nothing", iname)
1318 self.tried_once = False
1321 if iname not in hyper.ListInstances():
1325 hyper.StopInstance(instance, retry=self.tried_once)
1326 except errors.HypervisorError, err:
1327 if iname not in hyper.ListInstances():
1328 # if the instance is no longer existing, consider this a
1329 # success and go to cleanup
1332 _Fail("Failed to stop instance %s: %s", iname, err)
1334 self.tried_once = True
1336 raise utils.RetryAgain()
1339 utils.Retry(_TryShutdown(), 5, timeout)
1340 except utils.RetryTimeout:
1341 # the shutdown did not succeed
1342 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1345 hyper.StopInstance(instance, force=True)
1346 except errors.HypervisorError, err:
1347 if iname in hyper.ListInstances():
1348 # only raise an error if the instance still exists, otherwise
1349 # the error could simply be "instance ... unknown"!
1350 _Fail("Failed to force stop instance %s: %s", iname, err)
1354 if iname in hyper.ListInstances():
1355 _Fail("Could not shutdown instance %s even by destroy", iname)
1358 hyper.CleanupInstance(instance.name)
1359 except errors.HypervisorError, err:
1360 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1362 _RemoveBlockDevLinks(iname, instance.disks)
1365 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1366 """Reboot an instance.
1368 @type instance: L{objects.Instance}
1369 @param instance: the instance object to reboot
1370 @type reboot_type: str
1371 @param reboot_type: the type of reboot, one the following
1373 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1374 instance OS, do not recreate the VM
1375 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1376 restart the VM (at the hypervisor level)
1377 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1378 not accepted here, since that mode is handled differently, in
1379 cmdlib, and translates into full stop and start of the
1380 instance (instead of a call_instance_reboot RPC)
1381 @type shutdown_timeout: integer
1382 @param shutdown_timeout: maximum timeout for soft shutdown
1386 running_instances = GetInstanceList([instance.hypervisor])
1388 if instance.name not in running_instances:
1389 _Fail("Cannot reboot instance %s that is not running", instance.name)
1391 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1392 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1394 hyper.RebootInstance(instance)
1395 except errors.HypervisorError, err:
1396 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1397 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1399 InstanceShutdown(instance, shutdown_timeout)
1400 return StartInstance(instance, False)
1401 except errors.HypervisorError, err:
1402 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1404 _Fail("Invalid reboot_type received: %s", reboot_type)
1407 def InstanceBalloonMemory(instance, memory):
1408 """Resize an instance's memory.
1410 @type instance: L{objects.Instance}
1411 @param instance: the instance object
1413 @param memory: new memory amount in MB
1417 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1418 running = hyper.ListInstances()
1419 if instance.name not in running:
1420 logging.info("Instance %s is not running, cannot balloon", instance.name)
1423 hyper.BalloonInstanceMemory(instance, memory)
1424 except errors.HypervisorError, err:
1425 _Fail("Failed to balloon instance memory: %s", err, exc=True)
1428 def MigrationInfo(instance):
1429 """Gather information about an instance to be migrated.
1431 @type instance: L{objects.Instance}
1432 @param instance: the instance definition
1435 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1437 info = hyper.MigrationInfo(instance)
1438 except errors.HypervisorError, err:
1439 _Fail("Failed to fetch migration information: %s", err, exc=True)
1443 def AcceptInstance(instance, info, target):
1444 """Prepare the node to accept an instance.
1446 @type instance: L{objects.Instance}
1447 @param instance: the instance definition
1448 @type info: string/data (opaque)
1449 @param info: migration information, from the source node
1450 @type target: string
1451 @param target: target host (usually ip), on this node
1454 # TODO: why is this required only for DTS_EXT_MIRROR?
1455 if instance.disk_template in constants.DTS_EXT_MIRROR:
1456 # Create the symlinks, as the disks are not active
1459 _GatherAndLinkBlockDevs(instance)
1460 except errors.BlockDeviceError, err:
1461 _Fail("Block device error: %s", err, exc=True)
1463 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1465 hyper.AcceptInstance(instance, info, target)
1466 except errors.HypervisorError, err:
1467 if instance.disk_template in constants.DTS_EXT_MIRROR:
1468 _RemoveBlockDevLinks(instance.name, instance.disks)
1469 _Fail("Failed to accept instance: %s", err, exc=True)
1472 def FinalizeMigrationDst(instance, info, success):
1473 """Finalize any preparation to accept an instance.
1475 @type instance: L{objects.Instance}
1476 @param instance: the instance definition
1477 @type info: string/data (opaque)
1478 @param info: migration information, from the source node
1479 @type success: boolean
1480 @param success: whether the migration was a success or a failure
1483 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1485 hyper.FinalizeMigrationDst(instance, info, success)
1486 except errors.HypervisorError, err:
1487 _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1490 def MigrateInstance(instance, target, live):
1491 """Migrates an instance to another node.
1493 @type instance: L{objects.Instance}
1494 @param instance: the instance definition
1495 @type target: string
1496 @param target: the target node name
1498 @param live: whether the migration should be done live or not (the
1499 interpretation of this parameter is left to the hypervisor)
1500 @raise RPCFail: if migration fails for some reason
1503 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1506 hyper.MigrateInstance(instance, target, live)
1507 except errors.HypervisorError, err:
1508 _Fail("Failed to migrate instance: %s", err, exc=True)
1511 def FinalizeMigrationSource(instance, success, live):
1512 """Finalize the instance migration on the source node.
1514 @type instance: L{objects.Instance}
1515 @param instance: the instance definition of the migrated instance
1517 @param success: whether the migration succeeded or not
1519 @param live: whether the user requested a live migration or not
1520 @raise RPCFail: If the execution fails for some reason
1523 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1526 hyper.FinalizeMigrationSource(instance, success, live)
1527 except Exception, err: # pylint: disable=W0703
1528 _Fail("Failed to finalize the migration on the source node: %s", err,
1532 def GetMigrationStatus(instance):
1533 """Get the migration status
1535 @type instance: L{objects.Instance}
1536 @param instance: the instance that is being migrated
1537 @rtype: L{objects.MigrationStatus}
1538 @return: the status of the current migration (one of
1539 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1540 progress info that can be retrieved from the hypervisor
1541 @raise RPCFail: If the migration status cannot be retrieved
1544 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1546 return hyper.GetMigrationStatus(instance)
1547 except Exception, err: # pylint: disable=W0703
1548 _Fail("Failed to get migration status: %s", err, exc=True)
1551 def BlockdevCreate(disk, size, owner, on_primary, info):
1552 """Creates a block device for an instance.
1554 @type disk: L{objects.Disk}
1555 @param disk: the object describing the disk we should create
1557 @param size: the size of the physical underlying device, in MiB
1559 @param owner: the name of the instance for which disk is created,
1560 used for device cache data
1561 @type on_primary: boolean
1562 @param on_primary: indicates if it is the primary node or not
1564 @param info: string that will be sent to the physical device
1565 creation, used for example to set (LVM) tags on LVs
1567 @return: the new unique_id of the device (this can sometime be
1568 computed only after creation), or None. On secondary nodes,
1569 it's not required to return anything.
1572 # TODO: remove the obsolete "size" argument
1573 # pylint: disable=W0613
1576 for child in disk.children:
1578 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1579 except errors.BlockDeviceError, err:
1580 _Fail("Can't assemble device %s: %s", child, err)
1581 if on_primary or disk.AssembleOnSecondary():
1582 # we need the children open in case the device itself has to
1585 # pylint: disable=E1103
1587 except errors.BlockDeviceError, err:
1588 _Fail("Can't make child '%s' read-write: %s", child, err)
1592 device = bdev.Create(disk, clist)
1593 except errors.BlockDeviceError, err:
1594 _Fail("Can't create block device: %s", err)
1596 if on_primary or disk.AssembleOnSecondary():
1599 except errors.BlockDeviceError, err:
1600 _Fail("Can't assemble device after creation, unusual event: %s", err)
1601 if on_primary or disk.OpenOnSecondary():
1603 device.Open(force=True)
1604 except errors.BlockDeviceError, err:
1605 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1606 DevCacheManager.UpdateCache(device.dev_path, owner,
1607 on_primary, disk.iv_name)
1609 device.SetInfo(info)
1611 return device.unique_id
1614 def _WipeDevice(path, offset, size):
1615 """This function actually wipes the device.
1617 @param path: The path to the device to wipe
1618 @param offset: The offset in MiB in the file
1619 @param size: The size in MiB to write
1622 # Internal sizes are always in Mebibytes; if the following "dd" command
1623 # should use a different block size the offset and size given to this
1624 # function must be adjusted accordingly before being passed to "dd".
1625 block_size = 1024 * 1024
1627 cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1628 "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1630 result = utils.RunCmd(cmd)
1633 _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1634 result.fail_reason, result.output)
1637 def BlockdevWipe(disk, offset, size):
1638 """Wipes a block device.
1640 @type disk: L{objects.Disk}
1641 @param disk: the disk object we want to wipe
1643 @param offset: The offset in MiB in the file
1645 @param size: The size in MiB to write
1649 rdev = _RecursiveFindBD(disk)
1650 except errors.BlockDeviceError:
1654 _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1656 # Do cross verify some of the parameters
1658 _Fail("Negative offset")
1660 _Fail("Negative size")
1661 if offset > rdev.size:
1662 _Fail("Offset is bigger than device size")
1663 if (offset + size) > rdev.size:
1664 _Fail("The provided offset and size to wipe is bigger than device size")
1666 _WipeDevice(rdev.dev_path, offset, size)
1669 def BlockdevPauseResumeSync(disks, pause):
1670 """Pause or resume the sync of the block device.
1672 @type disks: list of L{objects.Disk}
1673 @param disks: the disks object we want to pause/resume
1675 @param pause: Wheater to pause or resume
1681 rdev = _RecursiveFindBD(disk)
1682 except errors.BlockDeviceError:
1686 success.append((False, ("Cannot change sync for device %s:"
1687 " device not found" % disk.iv_name)))
1690 result = rdev.PauseResumeSync(pause)
1693 success.append((result, None))
1699 success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1704 def BlockdevRemove(disk):
1705 """Remove a block device.
1707 @note: This is intended to be called recursively.
1709 @type disk: L{objects.Disk}
1710 @param disk: the disk object we should remove
1712 @return: the success of the operation
1717 rdev = _RecursiveFindBD(disk)
1718 except errors.BlockDeviceError, err:
1719 # probably can't attach
1720 logging.info("Can't attach to device %s in remove", disk)
1722 if rdev is not None:
1723 r_path = rdev.dev_path
1726 except errors.BlockDeviceError, err:
1727 msgs.append(str(err))
1729 DevCacheManager.RemoveCache(r_path)
1732 for child in disk.children:
1734 BlockdevRemove(child)
1735 except RPCFail, err:
1736 msgs.append(str(err))
1739 _Fail("; ".join(msgs))
1742 def _RecursiveAssembleBD(disk, owner, as_primary):
1743 """Activate a block device for an instance.
1745 This is run on the primary and secondary nodes for an instance.
1747 @note: this function is called recursively.
1749 @type disk: L{objects.Disk}
1750 @param disk: the disk we try to assemble
1752 @param owner: the name of the instance which owns the disk
1753 @type as_primary: boolean
1754 @param as_primary: if we should make the block device
1757 @return: the assembled device or None (in case no device
1759 @raise errors.BlockDeviceError: in case there is an error
1760 during the activation of the children or the device
1766 mcn = disk.ChildrenNeeded()
1768 mcn = 0 # max number of Nones allowed
1770 mcn = len(disk.children) - mcn # max number of Nones
1771 for chld_disk in disk.children:
1773 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1774 except errors.BlockDeviceError, err:
1775 if children.count(None) >= mcn:
1778 logging.error("Error in child activation (but continuing): %s",
1780 children.append(cdev)
1782 if as_primary or disk.AssembleOnSecondary():
1783 r_dev = bdev.Assemble(disk, children)
1785 if as_primary or disk.OpenOnSecondary():
1787 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1788 as_primary, disk.iv_name)
1795 def BlockdevAssemble(disk, owner, as_primary, idx):
1796 """Activate a block device for an instance.
1798 This is a wrapper over _RecursiveAssembleBD.
1800 @rtype: str or boolean
1801 @return: a C{/dev/...} path for primary nodes, and
1802 C{True} for secondary nodes
1806 result = _RecursiveAssembleBD(disk, owner, as_primary)
1807 if isinstance(result, bdev.BlockDev):
1808 # pylint: disable=E1103
1809 result = result.dev_path
1811 _SymlinkBlockDev(owner, result, idx)
1812 except errors.BlockDeviceError, err:
1813 _Fail("Error while assembling disk: %s", err, exc=True)
1814 except OSError, err:
1815 _Fail("Error while symlinking disk: %s", err, exc=True)
1820 def BlockdevShutdown(disk):
1821 """Shut down a block device.
1823 First, if the device is assembled (Attach() is successful), then
1824 the device is shutdown. Then the children of the device are
1827 This function is called recursively. Note that we don't cache the
1828 children or such, as oppossed to assemble, shutdown of different
1829 devices doesn't require that the upper device was active.
1831 @type disk: L{objects.Disk}
1832 @param disk: the description of the disk we should
1838 r_dev = _RecursiveFindBD(disk)
1839 if r_dev is not None:
1840 r_path = r_dev.dev_path
1843 DevCacheManager.RemoveCache(r_path)
1844 except errors.BlockDeviceError, err:
1845 msgs.append(str(err))
1848 for child in disk.children:
1850 BlockdevShutdown(child)
1851 except RPCFail, err:
1852 msgs.append(str(err))
1855 _Fail("; ".join(msgs))
1858 def BlockdevAddchildren(parent_cdev, new_cdevs):
1859 """Extend a mirrored block device.
1861 @type parent_cdev: L{objects.Disk}
1862 @param parent_cdev: the disk to which we should add children
1863 @type new_cdevs: list of L{objects.Disk}
1864 @param new_cdevs: the list of children which we should add
1868 parent_bdev = _RecursiveFindBD(parent_cdev)
1869 if parent_bdev is None:
1870 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1871 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1872 if new_bdevs.count(None) > 0:
1873 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1874 parent_bdev.AddChildren(new_bdevs)
1877 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1878 """Shrink a mirrored block device.
1880 @type parent_cdev: L{objects.Disk}
1881 @param parent_cdev: the disk from which we should remove children
1882 @type new_cdevs: list of L{objects.Disk}
1883 @param new_cdevs: the list of children which we should remove
1887 parent_bdev = _RecursiveFindBD(parent_cdev)
1888 if parent_bdev is None:
1889 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1891 for disk in new_cdevs:
1892 rpath = disk.StaticDevPath()
1894 bd = _RecursiveFindBD(disk)
1896 _Fail("Can't find device %s while removing children", disk)
1898 devs.append(bd.dev_path)
1900 if not utils.IsNormAbsPath(rpath):
1901 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1903 parent_bdev.RemoveChildren(devs)
1906 def BlockdevGetmirrorstatus(disks):
1907 """Get the mirroring status of a list of devices.
1909 @type disks: list of L{objects.Disk}
1910 @param disks: the list of disks which we should query
1912 @return: List of L{objects.BlockDevStatus}, one for each disk
1913 @raise errors.BlockDeviceError: if any of the disks cannot be
1919 rbd = _RecursiveFindBD(dsk)
1921 _Fail("Can't find device %s", dsk)
1923 stats.append(rbd.CombinedSyncStatus())
1928 def BlockdevGetmirrorstatusMulti(disks):
1929 """Get the mirroring status of a list of devices.
1931 @type disks: list of L{objects.Disk}
1932 @param disks: the list of disks which we should query
1934 @return: List of tuples, (bool, status), one for each disk; bool denotes
1935 success/failure, status is L{objects.BlockDevStatus} on success, string
1942 rbd = _RecursiveFindBD(disk)
1944 result.append((False, "Can't find device %s" % disk))
1947 status = rbd.CombinedSyncStatus()
1948 except errors.BlockDeviceError, err:
1949 logging.exception("Error while getting disk status")
1950 result.append((False, str(err)))
1952 result.append((True, status))
1954 assert len(disks) == len(result)
1959 def _RecursiveFindBD(disk):
1960 """Check if a device is activated.
1962 If so, return information about the real device.
1964 @type disk: L{objects.Disk}
1965 @param disk: the disk object we need to find
1967 @return: None if the device can't be found,
1968 otherwise the device instance
1973 for chdisk in disk.children:
1974 children.append(_RecursiveFindBD(chdisk))
1976 return bdev.FindDevice(disk, children)
1979 def _OpenRealBD(disk):
1980 """Opens the underlying block device of a disk.
1982 @type disk: L{objects.Disk}
1983 @param disk: the disk object we want to open
1986 real_disk = _RecursiveFindBD(disk)
1987 if real_disk is None:
1988 _Fail("Block device '%s' is not set up", disk)
1995 def BlockdevFind(disk):
1996 """Check if a device is activated.
1998 If it is, return information about the real device.
2000 @type disk: L{objects.Disk}
2001 @param disk: the disk to find
2002 @rtype: None or objects.BlockDevStatus
2003 @return: None if the disk cannot be found, otherwise a the current
2008 rbd = _RecursiveFindBD(disk)
2009 except errors.BlockDeviceError, err:
2010 _Fail("Failed to find device: %s", err, exc=True)
2015 return rbd.GetSyncStatus()
2018 def BlockdevGetsize(disks):
2019 """Computes the size of the given disks.
2021 If a disk is not found, returns None instead.
2023 @type disks: list of L{objects.Disk}
2024 @param disks: the list of disk to compute the size for
2026 @return: list with elements None if the disk cannot be found,
2033 rbd = _RecursiveFindBD(cf)
2034 except errors.BlockDeviceError:
2040 result.append(rbd.GetActualSize())
2044 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2045 """Export a block device to a remote node.
2047 @type disk: L{objects.Disk}
2048 @param disk: the description of the disk to export
2049 @type dest_node: str
2050 @param dest_node: the destination node to export to
2051 @type dest_path: str
2052 @param dest_path: the destination path on the target node
2053 @type cluster_name: str
2054 @param cluster_name: the cluster name, needed for SSH hostalias
2058 real_disk = _OpenRealBD(disk)
2060 # the block size on the read dd is 1MiB to match our units
2061 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2062 "dd if=%s bs=1048576 count=%s",
2063 real_disk.dev_path, str(disk.size))
2065 # we set here a smaller block size as, due to ssh buffering, more
2066 # than 64-128k will mostly ignored; we use nocreat to fail if the
2067 # device is not already there or we pass a wrong path; we use
2068 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2069 # to not buffer too much memory; this means that at best, we flush
2070 # every 64k, which will not be very fast
2071 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2072 " oflag=dsync", dest_path)
2074 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2075 constants.SSH_LOGIN_USER,
2078 # all commands have been checked, so we're safe to combine them
2079 command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2081 result = utils.RunCmd(["bash", "-c", command])
2084 _Fail("Disk copy command '%s' returned error: %s"
2085 " output: %s", command, result.fail_reason, result.output)
2088 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2089 """Write a file to the filesystem.
2091 This allows the master to overwrite(!) a file. It will only perform
2092 the operation if the file belongs to a list of configuration files.
2094 @type file_name: str
2095 @param file_name: the target file name
2097 @param data: the new contents of the file
2099 @param mode: the mode to give the file (can be None)
2101 @param uid: the owner of the file
2103 @param gid: the group of the file
2105 @param atime: the atime to set on the file (can be None)
2107 @param mtime: the mtime to set on the file (can be None)
2111 file_name = vcluster.LocalizeVirtualPath(file_name)
2113 if not os.path.isabs(file_name):
2114 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2116 if file_name not in _ALLOWED_UPLOAD_FILES:
2117 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2120 raw_data = _Decompress(data)
2122 if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2123 _Fail("Invalid username/groupname type")
2125 getents = runtime.GetEnts()
2126 uid = getents.LookupUser(uid)
2127 gid = getents.LookupGroup(gid)
2129 utils.SafeWriteFile(file_name, None,
2130 data=raw_data, mode=mode, uid=uid, gid=gid,
2131 atime=atime, mtime=mtime)
2134 def RunOob(oob_program, command, node, timeout):
2135 """Executes oob_program with given command on given node.
2137 @param oob_program: The path to the executable oob_program
2138 @param command: The command to invoke on oob_program
2139 @param node: The node given as an argument to the program
2140 @param timeout: Timeout after which we kill the oob program
2143 @raise RPCFail: If execution fails for some reason
2146 result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2149 _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2150 result.fail_reason, result.output)
2152 return result.stdout
2155 def _OSOndiskAPIVersion(os_dir):
2156 """Compute and return the API version of a given OS.
2158 This function will try to read the API version of the OS residing in
2159 the 'os_dir' directory.
2162 @param os_dir: the directory in which we should look for the OS
2164 @return: tuple (status, data) with status denoting the validity and
2165 data holding either the vaid versions or an error message
2168 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2171 st = os.stat(api_file)
2172 except EnvironmentError, err:
2173 return False, ("Required file '%s' not found under path %s: %s" %
2174 (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2176 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2177 return False, ("File '%s' in %s is not a regular file" %
2178 (constants.OS_API_FILE, os_dir))
2181 api_versions = utils.ReadFile(api_file).splitlines()
2182 except EnvironmentError, err:
2183 return False, ("Error while reading the API version file at %s: %s" %
2184 (api_file, utils.ErrnoOrStr(err)))
2187 api_versions = [int(version.strip()) for version in api_versions]
2188 except (TypeError, ValueError), err:
2189 return False, ("API version(s) can't be converted to integer: %s" %
2192 return True, api_versions
2195 def DiagnoseOS(top_dirs=None):
2196 """Compute the validity for all OSes.
2198 @type top_dirs: list
2199 @param top_dirs: the list of directories in which to
2200 search (if not given defaults to
2201 L{pathutils.OS_SEARCH_PATH})
2202 @rtype: list of L{objects.OS}
2203 @return: a list of tuples (name, path, status, diagnose, variants,
2204 parameters, api_version) for all (potential) OSes under all
2205 search paths, where:
2206 - name is the (potential) OS name
2207 - path is the full path to the OS
2208 - status True/False is the validity of the OS
2209 - diagnose is the error message for an invalid OS, otherwise empty
2210 - variants is a list of supported OS variants, if any
2211 - parameters is a list of (name, help) parameters, if any
2212 - api_version is a list of support OS API versions
2215 if top_dirs is None:
2216 top_dirs = pathutils.OS_SEARCH_PATH
2219 for dir_name in top_dirs:
2220 if os.path.isdir(dir_name):
2222 f_names = utils.ListVisibleFiles(dir_name)
2223 except EnvironmentError, err:
2224 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2226 for name in f_names:
2227 os_path = utils.PathJoin(dir_name, name)
2228 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2231 variants = os_inst.supported_variants
2232 parameters = os_inst.supported_parameters
2233 api_versions = os_inst.api_versions
2236 variants = parameters = api_versions = []
2237 result.append((name, os_path, status, diagnose, variants,
2238 parameters, api_versions))
2243 def _TryOSFromDisk(name, base_dir=None):
2244 """Create an OS instance from disk.
2246 This function will return an OS instance if the given name is a
2249 @type base_dir: string
2250 @keyword base_dir: Base directory containing OS installations.
2251 Defaults to a search in all the OS_SEARCH_PATH dirs.
2253 @return: success and either the OS instance if we find a valid one,
2257 if base_dir is None:
2258 os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2260 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2263 return False, "Directory for OS %s not found in search path" % name
2265 status, api_versions = _OSOndiskAPIVersion(os_dir)
2268 return status, api_versions
2270 if not constants.OS_API_VERSIONS.intersection(api_versions):
2271 return False, ("API version mismatch for path '%s': found %s, want %s." %
2272 (os_dir, api_versions, constants.OS_API_VERSIONS))
2274 # OS Files dictionary, we will populate it with the absolute path
2275 # names; if the value is True, then it is a required file, otherwise
2277 os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2279 if max(api_versions) >= constants.OS_API_V15:
2280 os_files[constants.OS_VARIANTS_FILE] = False
2282 if max(api_versions) >= constants.OS_API_V20:
2283 os_files[constants.OS_PARAMETERS_FILE] = True
2285 del os_files[constants.OS_SCRIPT_VERIFY]
2287 for (filename, required) in os_files.items():
2288 os_files[filename] = utils.PathJoin(os_dir, filename)
2291 st = os.stat(os_files[filename])
2292 except EnvironmentError, err:
2293 if err.errno == errno.ENOENT and not required:
2294 del os_files[filename]
2296 return False, ("File '%s' under path '%s' is missing (%s)" %
2297 (filename, os_dir, utils.ErrnoOrStr(err)))
2299 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2300 return False, ("File '%s' under path '%s' is not a regular file" %
2303 if filename in constants.OS_SCRIPTS:
2304 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2305 return False, ("File '%s' under path '%s' is not executable" %
2309 if constants.OS_VARIANTS_FILE in os_files:
2310 variants_file = os_files[constants.OS_VARIANTS_FILE]
2313 utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2314 except EnvironmentError, err:
2315 # we accept missing files, but not other errors
2316 if err.errno != errno.ENOENT:
2317 return False, ("Error while reading the OS variants file at %s: %s" %
2318 (variants_file, utils.ErrnoOrStr(err)))
2321 if constants.OS_PARAMETERS_FILE in os_files:
2322 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2324 parameters = utils.ReadFile(parameters_file).splitlines()
2325 except EnvironmentError, err:
2326 return False, ("Error while reading the OS parameters file at %s: %s" %
2327 (parameters_file, utils.ErrnoOrStr(err)))
2328 parameters = [v.split(None, 1) for v in parameters]
2330 os_obj = objects.OS(name=name, path=os_dir,
2331 create_script=os_files[constants.OS_SCRIPT_CREATE],
2332 export_script=os_files[constants.OS_SCRIPT_EXPORT],
2333 import_script=os_files[constants.OS_SCRIPT_IMPORT],
2334 rename_script=os_files[constants.OS_SCRIPT_RENAME],
2335 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2337 supported_variants=variants,
2338 supported_parameters=parameters,
2339 api_versions=api_versions)
2343 def OSFromDisk(name, base_dir=None):
2344 """Create an OS instance from disk.
2346 This function will return an OS instance if the given name is a
2347 valid OS name. Otherwise, it will raise an appropriate
2348 L{RPCFail} exception, detailing why this is not a valid OS.
2350 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2351 an exception but returns true/false status data.
2353 @type base_dir: string
2354 @keyword base_dir: Base directory containing OS installations.
2355 Defaults to a search in all the OS_SEARCH_PATH dirs.
2356 @rtype: L{objects.OS}
2357 @return: the OS instance if we find a valid one
2358 @raise RPCFail: if we don't find a valid OS
2361 name_only = objects.OS.GetName(name)
2362 status, payload = _TryOSFromDisk(name_only, base_dir)
2370 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2371 """Calculate the basic environment for an os script.
2374 @param os_name: full operating system name (including variant)
2375 @type inst_os: L{objects.OS}
2376 @param inst_os: operating system for which the environment is being built
2377 @type os_params: dict
2378 @param os_params: the OS parameters
2379 @type debug: integer
2380 @param debug: debug level (0 or 1, for OS Api 10)
2382 @return: dict of environment variables
2383 @raise errors.BlockDeviceError: if the block device
2389 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2390 result["OS_API_VERSION"] = "%d" % api_version
2391 result["OS_NAME"] = inst_os.name
2392 result["DEBUG_LEVEL"] = "%d" % debug
2395 if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2396 variant = objects.OS.GetVariant(os_name)
2398 variant = inst_os.supported_variants[0]
2401 result["OS_VARIANT"] = variant
2404 for pname, pvalue in os_params.items():
2405 result["OSP_%s" % pname.upper()] = pvalue
2407 # Set a default path otherwise programs called by OS scripts (or
2408 # even hooks called from OS scripts) might break, and we don't want
2409 # to have each script require setting a PATH variable
2410 result["PATH"] = constants.HOOKS_PATH
2415 def OSEnvironment(instance, inst_os, debug=0):
2416 """Calculate the environment for an os script.
2418 @type instance: L{objects.Instance}
2419 @param instance: target instance for the os script run
2420 @type inst_os: L{objects.OS}
2421 @param inst_os: operating system for which the environment is being built
2422 @type debug: integer
2423 @param debug: debug level (0 or 1, for OS Api 10)
2425 @return: dict of environment variables
2426 @raise errors.BlockDeviceError: if the block device
2430 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2432 for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2433 result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2435 result["HYPERVISOR"] = instance.hypervisor
2436 result["DISK_COUNT"] = "%d" % len(instance.disks)
2437 result["NIC_COUNT"] = "%d" % len(instance.nics)
2438 result["INSTANCE_SECONDARY_NODES"] = \
2439 ("%s" % " ".join(instance.secondary_nodes))
2442 for idx, disk in enumerate(instance.disks):
2443 real_disk = _OpenRealBD(disk)
2444 result["DISK_%d_PATH" % idx] = real_disk.dev_path
2445 result["DISK_%d_ACCESS" % idx] = disk.mode
2446 if constants.HV_DISK_TYPE in instance.hvparams:
2447 result["DISK_%d_FRONTEND_TYPE" % idx] = \
2448 instance.hvparams[constants.HV_DISK_TYPE]
2449 if disk.dev_type in constants.LDS_BLOCK:
2450 result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2451 elif disk.dev_type == constants.LD_FILE:
2452 result["DISK_%d_BACKEND_TYPE" % idx] = \
2453 "file:%s" % disk.physical_id[0]
2456 for idx, nic in enumerate(instance.nics):
2457 result["NIC_%d_MAC" % idx] = nic.mac
2459 result["NIC_%d_IP" % idx] = nic.ip
2460 result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2461 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2462 result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2463 if nic.nicparams[constants.NIC_LINK]:
2464 result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2466 result["NIC_%d_NETWORK" % idx] = nic.network
2467 if constants.HV_NIC_TYPE in instance.hvparams:
2468 result["NIC_%d_FRONTEND_TYPE" % idx] = \
2469 instance.hvparams[constants.HV_NIC_TYPE]
2472 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2473 for key, value in source.items():
2474 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2479 def BlockdevGrow(disk, amount, dryrun, backingstore):
2480 """Grow a stack of block devices.
2482 This function is called recursively, with the childrens being the
2483 first ones to resize.
2485 @type disk: L{objects.Disk}
2486 @param disk: the disk to be grown
2487 @type amount: integer
2488 @param amount: the amount (in mebibytes) to grow with
2489 @type dryrun: boolean
2490 @param dryrun: whether to execute the operation in simulation mode
2491 only, without actually increasing the size
2492 @param backingstore: whether to execute the operation on backing storage
2493 only, or on "logical" storage only; e.g. DRBD is logical storage,
2494 whereas LVM, file, RBD are backing storage
2495 @rtype: (status, result)
2496 @return: a tuple with the status of the operation (True/False), and
2497 the errors message if status is False
2500 r_dev = _RecursiveFindBD(disk)
2502 _Fail("Cannot find block device %s", disk)
2505 r_dev.Grow(amount, dryrun, backingstore)
2506 except errors.BlockDeviceError, err:
2507 _Fail("Failed to grow block device: %s", err, exc=True)
2510 def BlockdevSnapshot(disk):
2511 """Create a snapshot copy of a block device.
2513 This function is called recursively, and the snapshot is actually created
2514 just for the leaf lvm backend device.
2516 @type disk: L{objects.Disk}
2517 @param disk: the disk to be snapshotted
2519 @return: snapshot disk ID as (vg, lv)
2522 if disk.dev_type == constants.LD_DRBD8:
2523 if not disk.children:
2524 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2526 return BlockdevSnapshot(disk.children[0])
2527 elif disk.dev_type == constants.LD_LV:
2528 r_dev = _RecursiveFindBD(disk)
2529 if r_dev is not None:
2530 # FIXME: choose a saner value for the snapshot size
2531 # let's stay on the safe side and ask for the full size, for now
2532 return r_dev.Snapshot(disk.size)
2534 _Fail("Cannot find block device %s", disk)
2536 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2537 disk.unique_id, disk.dev_type)
2540 def BlockdevSetInfo(disk, info):
2541 """Sets 'metadata' information on block devices.
2543 This function sets 'info' metadata on block devices. Initial
2544 information is set at device creation; this function should be used
2545 for example after renames.
2547 @type disk: L{objects.Disk}
2548 @param disk: the disk to be grown
2550 @param info: new 'info' metadata
2551 @rtype: (status, result)
2552 @return: a tuple with the status of the operation (True/False), and
2553 the errors message if status is False
2556 r_dev = _RecursiveFindBD(disk)
2558 _Fail("Cannot find block device %s", disk)
2562 except errors.BlockDeviceError, err:
2563 _Fail("Failed to set information on block device: %s", err, exc=True)
2566 def FinalizeExport(instance, snap_disks):
2567 """Write out the export configuration information.
2569 @type instance: L{objects.Instance}
2570 @param instance: the instance which we export, used for
2571 saving configuration
2572 @type snap_disks: list of L{objects.Disk}
2573 @param snap_disks: list of snapshot block devices, which
2574 will be used to get the actual name of the dump file
2579 destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2580 finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2582 config = objects.SerializableConfigParser()
2584 config.add_section(constants.INISECT_EXP)
2585 config.set(constants.INISECT_EXP, "version", "0")
2586 config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2587 config.set(constants.INISECT_EXP, "source", instance.primary_node)
2588 config.set(constants.INISECT_EXP, "os", instance.os)
2589 config.set(constants.INISECT_EXP, "compression", "none")
2591 config.add_section(constants.INISECT_INS)
2592 config.set(constants.INISECT_INS, "name", instance.name)
2593 config.set(constants.INISECT_INS, "maxmem", "%d" %
2594 instance.beparams[constants.BE_MAXMEM])
2595 config.set(constants.INISECT_INS, "minmem", "%d" %
2596 instance.beparams[constants.BE_MINMEM])
2597 # "memory" is deprecated, but useful for exporting to old ganeti versions
2598 config.set(constants.INISECT_INS, "memory", "%d" %
2599 instance.beparams[constants.BE_MAXMEM])
2600 config.set(constants.INISECT_INS, "vcpus", "%d" %
2601 instance.beparams[constants.BE_VCPUS])
2602 config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2603 config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2604 config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2607 for nic_count, nic in enumerate(instance.nics):
2609 config.set(constants.INISECT_INS, "nic%d_mac" %
2610 nic_count, "%s" % nic.mac)
2611 config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2612 for param in constants.NICS_PARAMETER_TYPES:
2613 config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2614 "%s" % nic.nicparams.get(param, None))
2615 # TODO: redundant: on load can read nics until it doesn't exist
2616 config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2619 for disk_count, disk in enumerate(snap_disks):
2622 config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2623 ("%s" % disk.iv_name))
2624 config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2625 ("%s" % disk.physical_id[1]))
2626 config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2629 config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2631 # New-style hypervisor/backend parameters
2633 config.add_section(constants.INISECT_HYP)
2634 for name, value in instance.hvparams.items():
2635 if name not in constants.HVC_GLOBALS:
2636 config.set(constants.INISECT_HYP, name, str(value))
2638 config.add_section(constants.INISECT_BEP)
2639 for name, value in instance.beparams.items():
2640 config.set(constants.INISECT_BEP, name, str(value))
2642 config.add_section(constants.INISECT_OSP)
2643 for name, value in instance.osparams.items():
2644 config.set(constants.INISECT_OSP, name, str(value))
2646 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2647 data=config.Dumps())
2648 shutil.rmtree(finaldestdir, ignore_errors=True)
2649 shutil.move(destdir, finaldestdir)
2652 def ExportInfo(dest):
2653 """Get export configuration information.
2656 @param dest: directory containing the export
2658 @rtype: L{objects.SerializableConfigParser}
2659 @return: a serializable config file containing the
2663 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2665 config = objects.SerializableConfigParser()
2668 if (not config.has_section(constants.INISECT_EXP) or
2669 not config.has_section(constants.INISECT_INS)):
2670 _Fail("Export info file doesn't have the required fields")
2672 return config.Dumps()
2676 """Return a list of exports currently available on this machine.
2679 @return: list of the exports
2682 if os.path.isdir(pathutils.EXPORT_DIR):
2683 return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2685 _Fail("No exports directory")
2688 def RemoveExport(export):
2689 """Remove an existing export from the node.
2692 @param export: the name of the export to remove
2696 target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2699 shutil.rmtree(target)
2700 except EnvironmentError, err:
2701 _Fail("Error while removing the export: %s", err, exc=True)
2704 def BlockdevRename(devlist):
2705 """Rename a list of block devices.
2707 @type devlist: list of tuples
2708 @param devlist: list of tuples of the form (disk,
2709 new_logical_id, new_physical_id); disk is an
2710 L{objects.Disk} object describing the current disk,
2711 and new logical_id/physical_id is the name we
2714 @return: True if all renames succeeded, False otherwise
2719 for disk, unique_id in devlist:
2720 dev = _RecursiveFindBD(disk)
2722 msgs.append("Can't find device %s in rename" % str(disk))
2726 old_rpath = dev.dev_path
2727 dev.Rename(unique_id)
2728 new_rpath = dev.dev_path
2729 if old_rpath != new_rpath:
2730 DevCacheManager.RemoveCache(old_rpath)
2731 # FIXME: we should add the new cache information here, like:
2732 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2733 # but we don't have the owner here - maybe parse from existing
2734 # cache? for now, we only lose lvm data when we rename, which
2735 # is less critical than DRBD or MD
2736 except errors.BlockDeviceError, err:
2737 msgs.append("Can't rename device '%s' to '%s': %s" %
2738 (dev, unique_id, err))
2739 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2742 _Fail("; ".join(msgs))
2745 def _TransformFileStorageDir(fs_dir):
2746 """Checks whether given file_storage_dir is valid.
2748 Checks wheter the given fs_dir is within the cluster-wide default
2749 file_storage_dir or the shared_file_storage_dir, which are stored in
2750 SimpleStore. Only paths under those directories are allowed.
2753 @param fs_dir: the path to check
2755 @return: the normalized path if valid, None otherwise
2758 if not (constants.ENABLE_FILE_STORAGE or
2759 constants.ENABLE_SHARED_FILE_STORAGE):
2760 _Fail("File storage disabled at configure time")
2762 bdev.CheckFileStoragePath(fs_dir)
2764 return os.path.normpath(fs_dir)
2767 def CreateFileStorageDir(file_storage_dir):
2768 """Create file storage directory.
2770 @type file_storage_dir: str
2771 @param file_storage_dir: directory to create
2774 @return: tuple with first element a boolean indicating wheter dir
2775 creation was successful or not
2778 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2779 if os.path.exists(file_storage_dir):
2780 if not os.path.isdir(file_storage_dir):
2781 _Fail("Specified storage dir '%s' is not a directory",
2785 os.makedirs(file_storage_dir, 0750)
2786 except OSError, err:
2787 _Fail("Cannot create file storage directory '%s': %s",
2788 file_storage_dir, err, exc=True)
2791 def RemoveFileStorageDir(file_storage_dir):
2792 """Remove file storage directory.
2794 Remove it only if it's empty. If not log an error and return.
2796 @type file_storage_dir: str
2797 @param file_storage_dir: the directory we should cleanup
2798 @rtype: tuple (success,)
2799 @return: tuple of one element, C{success}, denoting
2800 whether the operation was successful
2803 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2804 if os.path.exists(file_storage_dir):
2805 if not os.path.isdir(file_storage_dir):
2806 _Fail("Specified Storage directory '%s' is not a directory",
2808 # deletes dir only if empty, otherwise we want to fail the rpc call
2810 os.rmdir(file_storage_dir)
2811 except OSError, err:
2812 _Fail("Cannot remove file storage directory '%s': %s",
2813 file_storage_dir, err)
2816 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2817 """Rename the file storage directory.
2819 @type old_file_storage_dir: str
2820 @param old_file_storage_dir: the current path
2821 @type new_file_storage_dir: str
2822 @param new_file_storage_dir: the name we should rename to
2823 @rtype: tuple (success,)
2824 @return: tuple of one element, C{success}, denoting
2825 whether the operation was successful
2828 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2829 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2830 if not os.path.exists(new_file_storage_dir):
2831 if os.path.isdir(old_file_storage_dir):
2833 os.rename(old_file_storage_dir, new_file_storage_dir)
2834 except OSError, err:
2835 _Fail("Cannot rename '%s' to '%s': %s",
2836 old_file_storage_dir, new_file_storage_dir, err)
2838 _Fail("Specified storage dir '%s' is not a directory",
2839 old_file_storage_dir)
2841 if os.path.exists(old_file_storage_dir):
2842 _Fail("Cannot rename '%s' to '%s': both locations exist",
2843 old_file_storage_dir, new_file_storage_dir)
2846 def _EnsureJobQueueFile(file_name):
2847 """Checks whether the given filename is in the queue directory.
2849 @type file_name: str
2850 @param file_name: the file name we should check
2852 @raises RPCFail: if the file is not valid
2855 if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2856 _Fail("Passed job queue file '%s' does not belong to"
2857 " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2860 def JobQueueUpdate(file_name, content):
2861 """Updates a file in the queue directory.
2863 This is just a wrapper over L{utils.io.WriteFile}, with proper
2866 @type file_name: str
2867 @param file_name: the job file name
2869 @param content: the new job contents
2871 @return: the success of the operation
2874 file_name = vcluster.LocalizeVirtualPath(file_name)
2876 _EnsureJobQueueFile(file_name)
2877 getents = runtime.GetEnts()
2879 # Write and replace the file atomically
2880 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2881 gid=getents.masterd_gid)
2884 def JobQueueRename(old, new):
2885 """Renames a job queue file.
2887 This is just a wrapper over os.rename with proper checking.
2890 @param old: the old (actual) file name
2892 @param new: the desired file name
2894 @return: the success of the operation and payload
2897 old = vcluster.LocalizeVirtualPath(old)
2898 new = vcluster.LocalizeVirtualPath(new)
2900 _EnsureJobQueueFile(old)
2901 _EnsureJobQueueFile(new)
2903 getents = runtime.GetEnts()
2905 utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2906 dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2909 def BlockdevClose(instance_name, disks):
2910 """Closes the given block devices.
2912 This means they will be switched to secondary mode (in case of
2915 @param instance_name: if the argument is not empty, the symlinks
2916 of this instance will be removed
2917 @type disks: list of L{objects.Disk}
2918 @param disks: the list of disks to be closed
2919 @rtype: tuple (success, message)
2920 @return: a tuple of success and message, where success
2921 indicates the succes of the operation, and message
2922 which will contain the error details in case we
2928 rd = _RecursiveFindBD(cf)
2930 _Fail("Can't find device %s", cf)
2937 except errors.BlockDeviceError, err:
2938 msg.append(str(err))
2940 _Fail("Can't make devices secondary: %s", ",".join(msg))
2943 _RemoveBlockDevLinks(instance_name, disks)
2946 def ValidateHVParams(hvname, hvparams):
2947 """Validates the given hypervisor parameters.
2949 @type hvname: string
2950 @param hvname: the hypervisor name
2951 @type hvparams: dict
2952 @param hvparams: the hypervisor parameters to be validated
2957 hv_type = hypervisor.GetHypervisor(hvname)
2958 hv_type.ValidateParameters(hvparams)
2959 except errors.HypervisorError, err:
2960 _Fail(str(err), log=False)
2963 def _CheckOSPList(os_obj, parameters):
2964 """Check whether a list of parameters is supported by the OS.
2966 @type os_obj: L{objects.OS}
2967 @param os_obj: OS object to check
2968 @type parameters: list
2969 @param parameters: the list of parameters to check
2972 supported = [v[0] for v in os_obj.supported_parameters]
2973 delta = frozenset(parameters).difference(supported)
2975 _Fail("The following parameters are not supported"
2976 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2979 def ValidateOS(required, osname, checks, osparams):
2980 """Validate the given OS' parameters.
2982 @type required: boolean
2983 @param required: whether absence of the OS should translate into
2985 @type osname: string
2986 @param osname: the OS to be validated
2988 @param checks: list of the checks to run (currently only 'parameters')
2989 @type osparams: dict
2990 @param osparams: dictionary with OS parameters
2992 @return: True if the validation passed, or False if the OS was not
2993 found and L{required} was false
2996 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2997 _Fail("Unknown checks required for OS %s: %s", osname,
2998 set(checks).difference(constants.OS_VALIDATE_CALLS))
3000 name_only = objects.OS.GetName(osname)
3001 status, tbv = _TryOSFromDisk(name_only, None)
3009 if max(tbv.api_versions) < constants.OS_API_V20:
3012 if constants.OS_VALIDATE_PARAMETERS in checks:
3013 _CheckOSPList(tbv, osparams.keys())
3015 validate_env = OSCoreEnv(osname, tbv, osparams)
3016 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3017 cwd=tbv.path, reset_env=True)
3019 logging.error("os validate command '%s' returned error: %s output: %s",
3020 result.cmd, result.fail_reason, result.output)
3021 _Fail("OS validation script failed (%s), output: %s",
3022 result.fail_reason, result.output, log=False)
3028 """Demotes the current node from master candidate role.
3031 # try to ensure we're not the master by mistake
3032 master, myself = ssconf.GetMasterAndMyself()
3033 if master == myself:
3034 _Fail("ssconf status shows I'm the master node, will not demote")
3036 result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3037 if not result.failed:
3038 _Fail("The master daemon is running, will not demote")
3041 if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3042 utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3043 except EnvironmentError, err:
3044 if err.errno != errno.ENOENT:
3045 _Fail("Error while backing up cluster file: %s", err, exc=True)
3047 utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3050 def _GetX509Filenames(cryptodir, name):
3051 """Returns the full paths for the private key and certificate.
3054 return (utils.PathJoin(cryptodir, name),
3055 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3056 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3059 def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3060 """Creates a new X509 certificate for SSL/TLS.
3063 @param validity: Validity in seconds
3064 @rtype: tuple; (string, string)
3065 @return: Certificate name and public part
3068 (key_pem, cert_pem) = \
3069 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3070 min(validity, _MAX_SSL_CERT_VALIDITY))
3072 cert_dir = tempfile.mkdtemp(dir=cryptodir,
3073 prefix="x509-%s-" % utils.TimestampForFilename())
3075 name = os.path.basename(cert_dir)
3076 assert len(name) > 5
3078 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3080 utils.WriteFile(key_file, mode=0400, data=key_pem)
3081 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3083 # Never return private key as it shouldn't leave the node
3084 return (name, cert_pem)
3086 shutil.rmtree(cert_dir, ignore_errors=True)
3090 def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3091 """Removes a X509 certificate.
3094 @param name: Certificate name
3097 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3099 utils.RemoveFile(key_file)
3100 utils.RemoveFile(cert_file)
3104 except EnvironmentError, err:
3105 _Fail("Cannot remove certificate directory '%s': %s",
3109 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3110 """Returns the command for the requested input/output.
3112 @type instance: L{objects.Instance}
3113 @param instance: The instance object
3114 @param mode: Import/export mode
3115 @param ieio: Input/output type
3116 @param ieargs: Input/output arguments
3119 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3126 if ieio == constants.IEIO_FILE:
3127 (filename, ) = ieargs
3129 if not utils.IsNormAbsPath(filename):
3130 _Fail("Path '%s' is not normalized or absolute", filename)
3132 real_filename = os.path.realpath(filename)
3133 directory = os.path.dirname(real_filename)
3135 if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3136 _Fail("File '%s' is not under exports directory '%s': %s",
3137 filename, pathutils.EXPORT_DIR, real_filename)
3140 utils.Makedirs(directory, mode=0750)
3142 quoted_filename = utils.ShellQuote(filename)
3144 if mode == constants.IEM_IMPORT:
3145 suffix = "> %s" % quoted_filename
3146 elif mode == constants.IEM_EXPORT:
3147 suffix = "< %s" % quoted_filename
3149 # Retrieve file size
3151 st = os.stat(filename)
3152 except EnvironmentError, err:
3153 logging.error("Can't stat(2) %s: %s", filename, err)
3155 exp_size = utils.BytesToMebibyte(st.st_size)
3157 elif ieio == constants.IEIO_RAW_DISK:
3160 real_disk = _OpenRealBD(disk)
3162 if mode == constants.IEM_IMPORT:
3163 # we set here a smaller block size as, due to transport buffering, more
3164 # than 64-128k will mostly ignored; we use nocreat to fail if the device
3165 # is not already there or we pass a wrong path; we use notrunc to no
3166 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3167 # much memory; this means that at best, we flush every 64k, which will
3169 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3170 " bs=%s oflag=dsync"),
3174 elif mode == constants.IEM_EXPORT:
3175 # the block size on the read dd is 1MiB to match our units
3176 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3178 str(1024 * 1024), # 1 MB
3180 exp_size = disk.size
3182 elif ieio == constants.IEIO_SCRIPT:
3183 (disk, disk_index, ) = ieargs
3185 assert isinstance(disk_index, (int, long))
3187 real_disk = _OpenRealBD(disk)
3189 inst_os = OSFromDisk(instance.os)
3190 env = OSEnvironment(instance, inst_os)
3192 if mode == constants.IEM_IMPORT:
3193 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3194 env["IMPORT_INDEX"] = str(disk_index)
3195 script = inst_os.import_script
3197 elif mode == constants.IEM_EXPORT:
3198 env["EXPORT_DEVICE"] = real_disk.dev_path
3199 env["EXPORT_INDEX"] = str(disk_index)
3200 script = inst_os.export_script
3202 # TODO: Pass special environment only to script
3203 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3205 if mode == constants.IEM_IMPORT:
3206 suffix = "| %s" % script_cmd
3208 elif mode == constants.IEM_EXPORT:
3209 prefix = "%s |" % script_cmd
3211 # Let script predict size
3212 exp_size = constants.IE_CUSTOM_SIZE
3215 _Fail("Invalid %s I/O mode %r", mode, ieio)
3217 return (env, prefix, suffix, exp_size)
3220 def _CreateImportExportStatusDir(prefix):
3221 """Creates status directory for import/export.
3224 return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3226 (prefix, utils.TimestampForFilename())))
3229 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3231 """Starts an import or export daemon.
3233 @param mode: Import/output mode
3234 @type opts: L{objects.ImportExportOptions}
3235 @param opts: Daemon options
3237 @param host: Remote host for export (None for import)
3239 @param port: Remote port for export (None for import)
3240 @type instance: L{objects.Instance}
3241 @param instance: Instance object
3242 @type component: string
3243 @param component: which part of the instance is transferred now,
3245 @param ieio: Input/output type
3246 @param ieioargs: Input/output arguments
3249 if mode == constants.IEM_IMPORT:
3252 if not (host is None and port is None):
3253 _Fail("Can not specify host or port on import")
3255 elif mode == constants.IEM_EXPORT:
3258 if host is None or port is None:
3259 _Fail("Host and port must be specified for an export")
3262 _Fail("Invalid mode %r", mode)
3264 if (opts.key_name is None) ^ (opts.ca_pem is None):
3265 _Fail("Cluster certificate can only be used for both key and CA")
3267 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3268 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3270 if opts.key_name is None:
3272 key_path = pathutils.NODED_CERT_FILE
3273 cert_path = pathutils.NODED_CERT_FILE
3274 assert opts.ca_pem is None
3276 (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3278 assert opts.ca_pem is not None
3280 for i in [key_path, cert_path]:
3281 if not os.path.exists(i):
3282 _Fail("File '%s' does not exist" % i)
3284 status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3286 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3287 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3288 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3290 if opts.ca_pem is None:
3292 ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3297 utils.WriteFile(ca_file, data=ca, mode=0400)
3300 pathutils.IMPORT_EXPORT_DAEMON,
3302 "--key=%s" % key_path,
3303 "--cert=%s" % cert_path,
3304 "--ca=%s" % ca_file,
3308 cmd.append("--host=%s" % host)
3311 cmd.append("--port=%s" % port)
3314 cmd.append("--ipv6")
3316 cmd.append("--ipv4")
3319 cmd.append("--compress=%s" % opts.compress)
3322 cmd.append("--magic=%s" % opts.magic)
3324 if exp_size is not None:
3325 cmd.append("--expected-size=%s" % exp_size)
3328 cmd.append("--cmd-prefix=%s" % cmd_prefix)
3331 cmd.append("--cmd-suffix=%s" % cmd_suffix)
3333 if mode == constants.IEM_EXPORT:
3334 # Retry connection a few times when connecting to remote peer
3335 cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3336 cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3337 elif opts.connect_timeout is not None:
3338 assert mode == constants.IEM_IMPORT
3339 # Overall timeout for establishing connection while listening
3340 cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3342 logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3344 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3345 # support for receiving a file descriptor for output
3346 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3349 # The import/export name is simply the status directory name
3350 return os.path.basename(status_dir)
3353 shutil.rmtree(status_dir, ignore_errors=True)
3357 def GetImportExportStatus(names):
3358 """Returns import/export daemon status.
3360 @type names: sequence
3361 @param names: List of names
3362 @rtype: List of dicts
3363 @return: Returns a list of the state of each named import/export or None if a
3364 status couldn't be read
3370 status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3374 data = utils.ReadFile(status_file)
3375 except EnvironmentError, err:
3376 if err.errno != errno.ENOENT:
3384 result.append(serializer.LoadJson(data))
3389 def AbortImportExport(name):
3390 """Sends SIGTERM to a running import/export daemon.
3393 logging.info("Abort import/export %s", name)
3395 status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3396 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3399 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3401 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3404 def CleanupImportExport(name):
3405 """Cleanup after an import or export.
3407 If the import/export daemon is still running it's killed. Afterwards the
3408 whole status directory is removed.
3411 logging.info("Finalizing import/export %s", name)
3413 status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3415 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3418 logging.info("Import/export %s is still running with PID %s",
3420 utils.KillProcess(pid, waitpid=False)
3422 shutil.rmtree(status_dir, ignore_errors=True)
3425 def _FindDisks(nodes_ip, disks):
3426 """Sets the physical ID on disks and returns the block devices.
3429 # set the correct physical ID
3430 my_name = netutils.Hostname.GetSysName()
3432 cf.SetPhysicalID(my_name, nodes_ip)
3437 rd = _RecursiveFindBD(cf)
3439 _Fail("Can't find device %s", cf)
3444 def DrbdDisconnectNet(nodes_ip, disks):
3445 """Disconnects the network on a list of drbd devices.
3448 bdevs = _FindDisks(nodes_ip, disks)
3454 except errors.BlockDeviceError, err:
3455 _Fail("Can't change network configuration to standalone mode: %s",
3459 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3460 """Attaches the network on a list of drbd devices.
3463 bdevs = _FindDisks(nodes_ip, disks)
3466 for idx, rd in enumerate(bdevs):
3468 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3469 except EnvironmentError, err:
3470 _Fail("Can't create symlink: %s", err)
3471 # reconnect disks, switch to new master configuration and if
3472 # needed primary mode
3475 rd.AttachNet(multimaster)
3476 except errors.BlockDeviceError, err:
3477 _Fail("Can't change network configuration: %s", err)
3479 # wait until the disks are connected; we need to retry the re-attach
3480 # if the device becomes standalone, as this might happen if the one
3481 # node disconnects and reconnects in a different mode before the
3482 # other node reconnects; in this case, one or both of the nodes will
3483 # decide it has wrong configuration and switch to standalone
3486 all_connected = True
3489 stats = rd.GetProcStatus()
3491 all_connected = (all_connected and
3492 (stats.is_connected or stats.is_in_resync))
3494 if stats.is_standalone:
3495 # peer had different config info and this node became
3496 # standalone, even though this should not happen with the
3497 # new staged way of changing disk configs
3499 rd.AttachNet(multimaster)
3500 except errors.BlockDeviceError, err:
3501 _Fail("Can't change network configuration: %s", err)
3503 if not all_connected:
3504 raise utils.RetryAgain()
3507 # Start with a delay of 100 miliseconds and go up to 5 seconds
3508 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3509 except utils.RetryTimeout:
3510 _Fail("Timeout in disk reconnecting")
3513 # change to primary mode
3517 except errors.BlockDeviceError, err:
3518 _Fail("Can't change to primary mode: %s", err)
3521 def DrbdWaitSync(nodes_ip, disks):
3522 """Wait until DRBDs have synchronized.
3526 stats = rd.GetProcStatus()
3527 if not (stats.is_connected or stats.is_in_resync):
3528 raise utils.RetryAgain()
3531 bdevs = _FindDisks(nodes_ip, disks)
3537 # poll each second for 15 seconds
3538 stats = utils.Retry(_helper, 1, 15, args=[rd])
3539 except utils.RetryTimeout:
3540 stats = rd.GetProcStatus()
3542 if not (stats.is_connected or stats.is_in_resync):
3543 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3544 alldone = alldone and (not stats.is_in_resync)
3545 if stats.sync_percent is not None:
3546 min_resync = min(min_resync, stats.sync_percent)
3548 return (alldone, min_resync)
3551 def GetDrbdUsermodeHelper():
3552 """Returns DRBD usermode helper currently configured.
3556 return bdev.BaseDRBD.GetUsermodeHelper()
3557 except errors.BlockDeviceError, err:
3561 def PowercycleNode(hypervisor_type):
3562 """Hard-powercycle the node.
3564 Because we need to return first, and schedule the powercycle in the
3565 background, we won't be able to report failures nicely.
3568 hyper = hypervisor.GetHypervisor(hypervisor_type)
3572 # if we can't fork, we'll pretend that we're in the child process
3575 return "Reboot scheduled in 5 seconds"
3576 # ensure the child is running on ram
3579 except Exception: # pylint: disable=W0703
3582 hyper.PowercycleNode()
3585 def _VerifyRemoteCommandName(cmd):
3586 """Verifies a remote command name.
3589 @param cmd: Command name
3590 @rtype: tuple; (boolean, string or None)
3591 @return: The tuple's first element is the status; if C{False}, the second
3592 element is an error message string, otherwise it's C{None}
3596 return (False, "Missing command name")
3598 if os.path.basename(cmd) != cmd:
3599 return (False, "Invalid command name")
3601 if not constants.EXT_PLUGIN_MASK.match(cmd):
3602 return (False, "Command name contains forbidden characters")
3607 def _CommonRemoteCommandCheck(path, owner):
3608 """Common checks for remote command file system directories and files.
3611 @param path: Path to check
3612 @param owner: C{None} or tuple containing UID and GID
3613 @rtype: tuple; (boolean, string or C{os.stat} result)
3614 @return: The tuple's first element is the status; if C{False}, the second
3615 element is an error message string, otherwise it's the result of C{os.stat}
3619 # Default to root as owner
3624 except EnvironmentError, err:
3625 return (False, "Can't stat(2) '%s': %s" % (path, err))
3627 if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3628 return (False, "Permissions on '%s' are too permissive" % path)
3630 if (st.st_uid, st.st_gid) != owner:
3631 (owner_uid, owner_gid) = owner
3632 return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3637 def _VerifyRemoteCommandDirectory(path, _owner=None):
3638 """Verifies remote command directory.
3641 @param path: Path to check
3642 @rtype: tuple; (boolean, string or None)
3643 @return: The tuple's first element is the status; if C{False}, the second
3644 element is an error message string, otherwise it's C{None}
3647 (status, value) = _CommonRemoteCommandCheck(path, _owner)
3650 return (False, value)
3652 if not stat.S_ISDIR(value.st_mode):
3653 return (False, "Path '%s' is not a directory" % path)
3658 def _VerifyRemoteCommand(path, cmd, _owner=None):
3659 """Verifies a whole remote command and returns its executable filename.
3662 @param path: Directory containing remote commands
3664 @param cmd: Command name
3665 @rtype: tuple; (boolean, string)
3666 @return: The tuple's first element is the status; if C{False}, the second
3667 element is an error message string, otherwise the second element is the
3668 absolute path to the executable
3671 executable = utils.PathJoin(path, cmd)
3673 (status, msg) = _CommonRemoteCommandCheck(executable, _owner)
3678 if not utils.IsExecutable(executable):
3679 return (False, "access(2) thinks '%s' can't be executed" % executable)
3681 return (True, executable)
3684 def _PrepareRemoteCommand(path, cmd,
3685 _verify_dir=_VerifyRemoteCommandDirectory,
3686 _verify_name=_VerifyRemoteCommandName,
3687 _verify_cmd=_VerifyRemoteCommand):
3688 """Performs a number of tests on a remote command.
3691 @param path: Directory containing remote commands
3693 @param cmd: Command name
3694 @return: Same as L{_VerifyRemoteCommand}
3697 # Verify the directory first
3698 (status, msg) = _verify_dir(path)
3700 # Check command if everything was alright
3701 (status, msg) = _verify_name(cmd)
3706 # Check actual executable
3707 return _verify_cmd(path, cmd)
3710 def RunRemoteCommand(cmd,
3711 _lock_timeout=_RCMD_LOCK_TIMEOUT,
3712 _lock_file=pathutils.REMOTE_COMMANDS_LOCK_FILE,
3713 _path=pathutils.REMOTE_COMMANDS_DIR,
3714 _sleep_fn=time.sleep,
3715 _prepare_fn=_PrepareRemoteCommand,
3716 _runcmd_fn=utils.RunCmd,
3717 _enabled=constants.ENABLE_REMOTE_COMMANDS):
3718 """Executes a remote command after performing strict tests.
3721 @param cmd: Command name
3723 @return: Command output
3724 @raise RPCFail: In case of an error
3727 logging.info("Preparing to run remote command '%s'", cmd)
3730 _Fail("Remote commands disabled at configure time")
3736 lock = utils.FileLock.Open(_lock_file)
3737 lock.Exclusive(blocking=True, timeout=_lock_timeout)
3739 (status, value) = _prepare_fn(_path, cmd)
3742 cmdresult = _runcmd_fn([value], env={}, reset_env=True,
3743 postfork_fn=lambda _: lock.Unlock())
3745 logging.error(value)
3746 except Exception: # pylint: disable=W0703
3747 # Keep original error in log
3748 logging.exception("Caught exception")
3750 if cmdresult is None:
3751 logging.info("Sleeping for %0.1f seconds before returning",
3752 _RCMD_INVALID_DELAY)
3753 _sleep_fn(_RCMD_INVALID_DELAY)
3755 # Do not include original error message in returned error
3756 _Fail("Executing command '%s' failed" % cmd)
3757 elif cmdresult.failed or cmdresult.fail_reason:
3758 _Fail("Remote command '%s' failed: %s; output: %s",
3759 cmd, cmdresult.fail_reason, cmdresult.output)
3761 return cmdresult.output
3763 if lock is not None:
3764 # Release lock at last
3769 class HooksRunner(object):
3772 This class is instantiated on the node side (ganeti-noded) and not
3776 def __init__(self, hooks_base_dir=None):
3777 """Constructor for hooks runner.
3779 @type hooks_base_dir: str or None
3780 @param hooks_base_dir: if not None, this overrides the
3781 L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3784 if hooks_base_dir is None:
3785 hooks_base_dir = pathutils.HOOKS_BASE_DIR
3786 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3788 self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3790 def RunLocalHooks(self, node_list, hpath, phase, env):
3791 """Check that the hooks will be run only locally and then run them.
3794 assert len(node_list) == 1
3796 _, myself = ssconf.GetMasterAndMyself()
3797 assert node == myself
3799 results = self.RunHooks(hpath, phase, env)
3801 # Return values in the form expected by HooksMaster
3802 return {node: (None, False, results)}
3804 def RunHooks(self, hpath, phase, env):
3805 """Run the scripts in the hooks directory.
3808 @param hpath: the path to the hooks directory which
3811 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3812 L{constants.HOOKS_PHASE_POST}
3814 @param env: dictionary with the environment for the hook
3816 @return: list of 3-element tuples:
3818 - script result, either L{constants.HKR_SUCCESS} or
3819 L{constants.HKR_FAIL}
3820 - output of the script
3822 @raise errors.ProgrammerError: for invalid input
3826 if phase == constants.HOOKS_PHASE_PRE:
3828 elif phase == constants.HOOKS_PHASE_POST:
3831 _Fail("Unknown hooks phase '%s'", phase)
3833 subdir = "%s-%s.d" % (hpath, suffix)
3834 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3838 if not os.path.isdir(dir_name):
3839 # for non-existing/non-dirs, we simply exit instead of logging a
3840 # warning at every operation
3843 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3845 for (relname, relstatus, runresult) in runparts_results:
3846 if relstatus == constants.RUNPARTS_SKIP:
3847 rrval = constants.HKR_SKIP
3849 elif relstatus == constants.RUNPARTS_ERR:
3850 rrval = constants.HKR_FAIL
3851 output = "Hook script execution error: %s" % runresult
3852 elif relstatus == constants.RUNPARTS_RUN:
3853 if runresult.failed:
3854 rrval = constants.HKR_FAIL
3856 rrval = constants.HKR_SUCCESS
3857 output = utils.SafeEncode(runresult.output.strip())
3858 results.append(("%s/%s" % (subdir, relname), rrval, output))
3863 class IAllocatorRunner(object):
3864 """IAllocator runner.
3866 This class is instantiated on the node side (ganeti-noded) and not on
3871 def Run(name, idata):
3872 """Run an iallocator script.
3875 @param name: the iallocator script name
3877 @param idata: the allocator input data
3880 @return: two element tuple of:
3882 - either error message or stdout of allocator (for success)
3885 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3887 if alloc_script is None:
3888 _Fail("iallocator module '%s' not found in the search path", name)
3890 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3894 result = utils.RunCmd([alloc_script, fin_name])
3896 _Fail("iallocator module '%s' failed: %s, output '%s'",
3897 name, result.fail_reason, result.output)
3901 return result.stdout
3904 class DevCacheManager(object):
3905 """Simple class for managing a cache of block device information.
3908 _DEV_PREFIX = "/dev/"
3909 _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3912 def _ConvertPath(cls, dev_path):
3913 """Converts a /dev/name path to the cache file name.
3915 This replaces slashes with underscores and strips the /dev
3916 prefix. It then returns the full path to the cache file.
3919 @param dev_path: the C{/dev/} path name
3921 @return: the converted path name
3924 if dev_path.startswith(cls._DEV_PREFIX):
3925 dev_path = dev_path[len(cls._DEV_PREFIX):]
3926 dev_path = dev_path.replace("/", "_")
3927 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3931 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3932 """Updates the cache information for a given device.
3935 @param dev_path: the pathname of the device
3937 @param owner: the owner (instance name) of the device
3938 @type on_primary: bool
3939 @param on_primary: whether this is the primary
3942 @param iv_name: the instance-visible name of the
3943 device, as in objects.Disk.iv_name
3948 if dev_path is None:
3949 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3951 fpath = cls._ConvertPath(dev_path)
3957 iv_name = "not_visible"
3958 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3960 utils.WriteFile(fpath, data=fdata)
3961 except EnvironmentError, err:
3962 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3965 def RemoveCache(cls, dev_path):
3966 """Remove data for a dev_path.
3968 This is just a wrapper over L{utils.io.RemoveFile} with a converted
3969 path name and logging.
3972 @param dev_path: the pathname of the device
3977 if dev_path is None:
3978 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3980 fpath = cls._ConvertPath(dev_path)
3982 utils.RemoveFile(fpath)
3983 except EnvironmentError, err:
3984 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)