4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
68 constants.JOB_QUEUE_ARCHIVE_DIR,
70 constants.CRYPTO_KEYS_DIR,
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
83 class RPCFail(Exception):
84 """Class denoting RPC failure.
86 Its argument is the error message.
91 def _Fail(msg, *args, **kwargs):
92 """Log an error and the raise an RPCFail exception.
94 This exception is then handled specially in the ganeti daemon and
95 turned into a 'failed' return type. As such, this function is a
96 useful shortcut for logging the error and returning it to the master
100 @param msg: the text of the exception
106 if "log" not in kwargs or kwargs["log"]: # if we should log this error
107 if "exc" in kwargs and kwargs["exc"]:
108 logging.exception(msg)
115 """Simple wrapper to return a SimpleStore.
117 @rtype: L{ssconf.SimpleStore}
118 @return: a SimpleStore instance
121 return ssconf.SimpleStore()
124 def _GetSshRunner(cluster_name):
125 """Simple wrapper to return an SshRunner.
127 @type cluster_name: str
128 @param cluster_name: the cluster name, which is needed
129 by the SshRunner constructor
130 @rtype: L{ssh.SshRunner}
131 @return: an SshRunner instance
134 return ssh.SshRunner(cluster_name)
137 def _Decompress(data):
138 """Unpacks data compressed by the RPC client.
140 @type data: list or tuple
141 @param data: Data sent by RPC client
143 @return: Decompressed data
146 assert isinstance(data, (list, tuple))
147 assert len(data) == 2
148 (encoding, content) = data
149 if encoding == constants.RPC_ENCODING_NONE:
151 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152 return zlib.decompress(base64.b64decode(content))
154 raise AssertionError("Unknown data encoding")
157 def _CleanDirectory(path, exclude=None):
158 """Removes all regular files in a directory.
161 @param path: the directory to clean
163 @param exclude: list of files to be excluded, defaults
167 if path not in _ALLOWED_CLEAN_DIRS:
168 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
171 if not os.path.isdir(path):
176 # Normalize excluded paths
177 exclude = [os.path.normpath(i) for i in exclude]
179 for rel_name in utils.ListVisibleFiles(path):
180 full_name = utils.PathJoin(path, rel_name)
181 if full_name in exclude:
183 if os.path.isfile(full_name) and not os.path.islink(full_name):
184 utils.RemoveFile(full_name)
187 def _BuildUploadFileList():
188 """Build the list of allowed upload files.
190 This is abstracted so that it's built only once at module import time.
193 allowed_files = set([
194 constants.CLUSTER_CONF_FILE,
196 constants.SSH_KNOWN_HOSTS_FILE,
197 constants.VNC_PASSWORD_FILE,
198 constants.RAPI_CERT_FILE,
199 constants.SPICE_CERT_FILE,
200 constants.SPICE_CACERT_FILE,
201 constants.RAPI_USERS_FILE,
202 constants.CONFD_HMAC_KEY,
203 constants.CLUSTER_DOMAIN_SECRET_FILE,
206 for hv_name in constants.HYPER_TYPES:
207 hv_class = hypervisor.GetHypervisorClass(hv_name)
208 allowed_files.update(hv_class.GetAncillaryFiles())
210 return frozenset(allowed_files)
213 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
217 """Removes job queue files and archived jobs.
223 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
224 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
228 """Returns master information.
230 This is an utility function to compute master information, either
231 for consumption here or from the node daemon.
234 @return: master_netdev, master_ip, master_name, primary_ip_family,
236 @raise RPCFail: in case of errors
241 master_netdev = cfg.GetMasterNetdev()
242 master_ip = cfg.GetMasterIP()
243 master_netmask = cfg.GetMasterNetmask()
244 master_node = cfg.GetMasterNode()
245 primary_ip_family = cfg.GetPrimaryIPFamily()
246 except errors.ConfigurationError, err:
247 _Fail("Cluster configuration incomplete: %s", err, exc=True)
248 return (master_netdev, master_ip, master_node, primary_ip_family,
252 def ActivateMasterIp():
253 """Activate the IP address of the master daemon.
256 # GetMasterInfo will raise an exception if not able to return data
257 master_netdev, master_ip, _, family, master_netmask = GetMasterInfo()
260 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
261 if netutils.IPAddress.Own(master_ip):
262 # we already have the ip:
263 logging.debug("Master IP already configured, doing nothing")
265 err_msg = "Someone else has the master ip, not activating"
266 logging.error(err_msg)
268 ipcls = netutils.IP4Address
269 if family == netutils.IP6Address.family:
270 ipcls = netutils.IP6Address
272 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
273 "%s/%s" % (master_ip, master_netmask),
274 "dev", master_netdev, "label",
275 "%s:0" % master_netdev])
277 err_msg = "Can't activate master IP: %s" % result.output
278 logging.error(err_msg)
281 # we ignore the exit code of the following cmds
282 if ipcls == netutils.IP4Address:
283 utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
284 master_ip, master_ip])
285 elif ipcls == netutils.IP6Address:
287 utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
288 except errors.OpExecError:
289 # TODO: Better error reporting
290 logging.warning("Can't execute ndisc6, please install if missing")
296 def StartMasterDaemons(no_voting):
297 """Activate local node as master node.
299 The function will start the master daemons (ganeti-masterd and ganeti-rapi).
301 @type no_voting: boolean
302 @param no_voting: whether to start ganeti-masterd without a node vote
303 but still non-interactively
309 masterd_args = "--no-voting --yes-do-it"
314 "EXTRA_MASTERD_ARGS": masterd_args,
317 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
319 msg = "Can't start Ganeti master: %s" % result.output
324 def DeactivateMasterIp():
325 """Deactivate the master IP on this node.
328 # TODO: log and report back to the caller the error failures; we
329 # need to decide in which case we fail the RPC for this
331 # GetMasterInfo will raise an exception if not able to return data
332 master_netdev, master_ip, _, _, master_netmask = GetMasterInfo()
334 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
335 "%s/%s" % (master_ip, master_netmask),
336 "dev", master_netdev])
338 logging.error("Can't remove the master IP, error: %s", result.output)
339 # but otherwise ignore the failure
342 def StopMasterDaemons():
343 """Stop the master daemons on this node.
345 Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
350 # TODO: log and report back to the caller the error failures; we
351 # need to decide in which case we fail the RPC for this
353 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357 result.cmd, result.exit_code, result.output)
360 def ChangeMasterNetmask(netmask):
361 """Change the netmask of the master IP.
364 master_netdev, master_ip, _, _, old_netmask = GetMasterInfo()
365 if old_netmask == netmask:
368 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
369 "%s/%s" % (master_ip, netmask),
370 "dev", master_netdev, "label",
371 "%s:0" % master_netdev])
373 _Fail("Could not change the master IP netmask")
375 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
376 "%s/%s" % (master_ip, old_netmask),
377 "dev", master_netdev, "label",
378 "%s:0" % master_netdev])
380 _Fail("Could not change the master IP netmask")
383 def EtcHostsModify(mode, host, ip):
384 """Modify a host entry in /etc/hosts.
386 @param mode: The mode to operate. Either add or remove entry
387 @param host: The host to operate on
388 @param ip: The ip associated with the entry
391 if mode == constants.ETC_HOSTS_ADD:
393 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
395 utils.AddHostToEtcHosts(host, ip)
396 elif mode == constants.ETC_HOSTS_REMOVE:
398 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
399 " parameter is present")
400 utils.RemoveHostFromEtcHosts(host)
402 RPCFail("Mode not supported")
405 def LeaveCluster(modify_ssh_setup):
406 """Cleans up and remove the current node.
408 This function cleans up and prepares the current node to be removed
411 If processing is successful, then it raises an
412 L{errors.QuitGanetiException} which is used as a special case to
413 shutdown the node daemon.
415 @param modify_ssh_setup: boolean
418 _CleanDirectory(constants.DATA_DIR)
419 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
424 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
426 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
428 utils.RemoveFile(priv_key)
429 utils.RemoveFile(pub_key)
430 except errors.OpExecError:
431 logging.exception("Error while processing ssh files")
434 utils.RemoveFile(constants.CONFD_HMAC_KEY)
435 utils.RemoveFile(constants.RAPI_CERT_FILE)
436 utils.RemoveFile(constants.SPICE_CERT_FILE)
437 utils.RemoveFile(constants.SPICE_CACERT_FILE)
438 utils.RemoveFile(constants.NODED_CERT_FILE)
439 except: # pylint: disable=W0702
440 logging.exception("Error while removing cluster secrets")
442 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
444 logging.error("Command %s failed with exitcode %s and error %s",
445 result.cmd, result.exit_code, result.output)
447 # Raise a custom exception (handled in ganeti-noded)
448 raise errors.QuitGanetiException(True, "Shutdown scheduled")
451 def GetNodeInfo(vgname, hypervisor_type):
452 """Gives back a hash with different information about the node.
454 @type vgname: C{string}
455 @param vgname: the name of the volume group to ask for disk space information
456 @type hypervisor_type: C{str}
457 @param hypervisor_type: the name of the hypervisor to ask for
460 @return: dictionary with the following keys:
461 - vg_size is the size of the configured volume group in MiB
462 - vg_free is the free size of the volume group in MiB
463 - memory_dom0 is the memory allocated for domain0 in MiB
464 - memory_free is the currently available (free) ram in MiB
465 - memory_total is the total number of ram in MiB
466 - hv_version: the hypervisor version, if available
471 if vgname is not None:
472 vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
473 vg_free = vg_size = None
475 vg_free = int(round(vginfo[0][0], 0))
476 vg_size = int(round(vginfo[0][1], 0))
477 outputarray["vg_size"] = vg_size
478 outputarray["vg_free"] = vg_free
480 if hypervisor_type is not None:
481 hyper = hypervisor.GetHypervisor(hypervisor_type)
482 hyp_info = hyper.GetNodeInfo()
483 if hyp_info is not None:
484 outputarray.update(hyp_info)
486 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
491 def VerifyNode(what, cluster_name):
492 """Verify the status of the local node.
494 Based on the input L{what} parameter, various checks are done on the
497 If the I{filelist} key is present, this list of
498 files is checksummed and the file/checksum pairs are returned.
500 If the I{nodelist} key is present, we check that we have
501 connectivity via ssh with the target nodes (and check the hostname
504 If the I{node-net-test} key is present, we check that we have
505 connectivity to the given nodes via both primary IP and, if
506 applicable, secondary IPs.
509 @param what: a dictionary of things to check:
510 - filelist: list of files for which to compute checksums
511 - nodelist: list of nodes we should check ssh communication with
512 - node-net-test: list of nodes we should check node daemon port
514 - hypervisor: list with hypervisors to run the verify for
516 @return: a dictionary with the same keys as the input dict, and
517 values representing the result of the checks
521 my_name = netutils.Hostname.GetSysName()
522 port = netutils.GetDaemonPort(constants.NODED)
523 vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
525 if constants.NV_HYPERVISOR in what and vm_capable:
526 result[constants.NV_HYPERVISOR] = tmp = {}
527 for hv_name in what[constants.NV_HYPERVISOR]:
529 val = hypervisor.GetHypervisor(hv_name).Verify()
530 except errors.HypervisorError, err:
531 val = "Error while checking hypervisor: %s" % str(err)
534 if constants.NV_HVPARAMS in what and vm_capable:
535 result[constants.NV_HVPARAMS] = tmp = []
536 for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
538 logging.info("Validating hv %s, %s", hv_name, hvparms)
539 hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
540 except errors.HypervisorError, err:
541 tmp.append((source, hv_name, str(err)))
543 if constants.NV_FILELIST in what:
544 result[constants.NV_FILELIST] = utils.FingerprintFiles(
545 what[constants.NV_FILELIST])
547 if constants.NV_NODELIST in what:
548 (nodes, bynode) = what[constants.NV_NODELIST]
550 # Add nodes from other groups (different for each node)
552 nodes.extend(bynode[my_name])
557 random.shuffle(nodes)
559 # Try to contact all nodes
562 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
566 result[constants.NV_NODELIST] = val
568 if constants.NV_NODENETTEST in what:
569 result[constants.NV_NODENETTEST] = tmp = {}
570 my_pip = my_sip = None
571 for name, pip, sip in what[constants.NV_NODENETTEST]:
577 tmp[my_name] = ("Can't find my own primary/secondary IP"
580 for name, pip, sip in what[constants.NV_NODENETTEST]:
582 if not netutils.TcpPing(pip, port, source=my_pip):
583 fail.append("primary")
585 if not netutils.TcpPing(sip, port, source=my_sip):
586 fail.append("secondary")
588 tmp[name] = ("failure using the %s interface(s)" %
591 if constants.NV_MASTERIP in what:
592 # FIXME: add checks on incoming data structures (here and in the
593 # rest of the function)
594 master_name, master_ip = what[constants.NV_MASTERIP]
595 if master_name == my_name:
596 source = constants.IP4_ADDRESS_LOCALHOST
599 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
602 if constants.NV_OOB_PATHS in what:
603 result[constants.NV_OOB_PATHS] = tmp = []
604 for path in what[constants.NV_OOB_PATHS]:
608 tmp.append("error stating out of band helper: %s" % err)
610 if stat.S_ISREG(st.st_mode):
611 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
614 tmp.append("out of band helper %s is not executable" % path)
616 tmp.append("out of band helper %s is not a file" % path)
618 if constants.NV_LVLIST in what and vm_capable:
620 val = GetVolumeList(utils.ListVolumeGroups().keys())
623 result[constants.NV_LVLIST] = val
625 if constants.NV_INSTANCELIST in what and vm_capable:
626 # GetInstanceList can fail
628 val = GetInstanceList(what[constants.NV_INSTANCELIST])
631 result[constants.NV_INSTANCELIST] = val
633 if constants.NV_VGLIST in what and vm_capable:
634 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
636 if constants.NV_PVLIST in what and vm_capable:
637 result[constants.NV_PVLIST] = \
638 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
639 filter_allocatable=False)
641 if constants.NV_VERSION in what:
642 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
643 constants.RELEASE_VERSION)
645 if constants.NV_HVINFO in what and vm_capable:
646 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
647 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
649 if constants.NV_DRBDLIST in what and vm_capable:
651 used_minors = bdev.DRBD8.GetUsedDevs().keys()
652 except errors.BlockDeviceError, err:
653 logging.warning("Can't get used minors list", exc_info=True)
654 used_minors = str(err)
655 result[constants.NV_DRBDLIST] = used_minors
657 if constants.NV_DRBDHELPER in what and vm_capable:
660 payload = bdev.BaseDRBD.GetUsermodeHelper()
661 except errors.BlockDeviceError, err:
662 logging.error("Can't get DRBD usermode helper: %s", str(err))
665 result[constants.NV_DRBDHELPER] = (status, payload)
667 if constants.NV_NODESETUP in what:
668 result[constants.NV_NODESETUP] = tmpr = []
669 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
670 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
671 " under /sys, missing required directories /sys/block"
672 " and /sys/class/net")
673 if (not os.path.isdir("/proc/sys") or
674 not os.path.isfile("/proc/sysrq-trigger")):
675 tmpr.append("The procfs filesystem doesn't seem to be mounted"
676 " under /proc, missing required directory /proc/sys and"
677 " the file /proc/sysrq-trigger")
679 if constants.NV_TIME in what:
680 result[constants.NV_TIME] = utils.SplitTime(time.time())
682 if constants.NV_OSLIST in what and vm_capable:
683 result[constants.NV_OSLIST] = DiagnoseOS()
685 if constants.NV_BRIDGES in what and vm_capable:
686 result[constants.NV_BRIDGES] = [bridge
687 for bridge in what[constants.NV_BRIDGES]
688 if not utils.BridgeExists(bridge)]
692 def GetBlockDevSizes(devices):
693 """Return the size of the given block devices
696 @param devices: list of block device nodes to query
699 dictionary of all block devices under /dev (key). The value is their
702 {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
708 for devpath in devices:
709 if not utils.IsBelowDir(DEV_PREFIX, devpath):
713 st = os.stat(devpath)
714 except EnvironmentError, err:
715 logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
718 if stat.S_ISBLK(st.st_mode):
719 result = utils.RunCmd(["blockdev", "--getsize64", devpath])
721 # We don't want to fail, just do not list this device as available
722 logging.warning("Cannot get size for block device %s", devpath)
725 size = int(result.stdout) / (1024 * 1024)
726 blockdevs[devpath] = size
730 def GetVolumeList(vg_names):
731 """Compute list of logical volumes and their size.
734 @param vg_names: the volume groups whose LVs we should list, or
735 empty for all volume groups
738 dictionary of all partions (key) with value being a tuple of
739 their size (in MiB), inactive and online status::
741 {'xenvg/test1': ('20.06', True, True)}
743 in case of errors, a string is returned with the error
751 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
752 "--separator=%s" % sep,
753 "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
755 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
757 for line in result.stdout.splitlines():
759 match = _LVSLINE_REGEX.match(line)
761 logging.error("Invalid line returned from lvs output: '%s'", line)
763 vg_name, name, size, attr = match.groups()
764 inactive = attr[4] == "-"
765 online = attr[5] == "o"
766 virtual = attr[0] == "v"
768 # we don't want to report such volumes as existing, since they
769 # don't really hold data
771 lvs[vg_name + "/" + name] = (size, inactive, online)
776 def ListVolumeGroups():
777 """List the volume groups and their size.
780 @return: dictionary with keys volume name and values the
784 return utils.ListVolumeGroups()
788 """List all volumes on this node.
792 A list of dictionaries, each having four keys:
793 - name: the logical volume name,
794 - size: the size of the logical volume
795 - dev: the physical device on which the LV lives
796 - vg: the volume group to which it belongs
798 In case of errors, we return an empty list and log the
801 Note that since a logical volume can live on multiple physical
802 volumes, the resulting list might include a logical volume
806 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
808 "--options=lv_name,lv_size,devices,vg_name"])
810 _Fail("Failed to list logical volumes, lvs output: %s",
814 return dev.split("(")[0]
817 return [parse_dev(x) for x in dev.split(",")]
820 line = [v.strip() for v in line]
821 return [{"name": line[0], "size": line[1],
822 "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
825 for line in result.stdout.splitlines():
826 if line.count("|") >= 3:
827 all_devs.extend(map_line(line.split("|")))
829 logging.warning("Strange line in the output from lvs: '%s'", line)
833 def BridgesExist(bridges_list):
834 """Check if a list of bridges exist on the current node.
837 @return: C{True} if all of them exist, C{False} otherwise
841 for bridge in bridges_list:
842 if not utils.BridgeExists(bridge):
843 missing.append(bridge)
846 _Fail("Missing bridges %s", utils.CommaJoin(missing))
849 def GetInstanceList(hypervisor_list):
850 """Provides a list of instances.
852 @type hypervisor_list: list
853 @param hypervisor_list: the list of hypervisors to query information
856 @return: a list of all running instances on the current node
857 - instance1.example.com
858 - instance2.example.com
862 for hname in hypervisor_list:
864 names = hypervisor.GetHypervisor(hname).ListInstances()
865 results.extend(names)
866 except errors.HypervisorError, err:
867 _Fail("Error enumerating instances (hypervisor %s): %s",
868 hname, err, exc=True)
873 def GetInstanceInfo(instance, hname):
874 """Gives back the information about an instance as a dictionary.
876 @type instance: string
877 @param instance: the instance name
879 @param hname: the hypervisor type of the instance
882 @return: dictionary with the following keys:
883 - memory: memory size of instance (int)
884 - state: xen state of instance (string)
885 - time: cpu time of instance (float)
890 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
891 if iinfo is not None:
892 output["memory"] = iinfo[2]
893 output["state"] = iinfo[4]
894 output["time"] = iinfo[5]
899 def GetInstanceMigratable(instance):
900 """Gives whether an instance can be migrated.
902 @type instance: L{objects.Instance}
903 @param instance: object representing the instance to be checked.
906 @return: tuple of (result, description) where:
907 - result: whether the instance can be migrated or not
908 - description: a description of the issue, if relevant
911 hyper = hypervisor.GetHypervisor(instance.hypervisor)
912 iname = instance.name
913 if iname not in hyper.ListInstances():
914 _Fail("Instance %s is not running", iname)
916 for idx in range(len(instance.disks)):
917 link_name = _GetBlockDevSymlinkPath(iname, idx)
918 if not os.path.islink(link_name):
919 logging.warning("Instance %s is missing symlink %s for disk %d",
920 iname, link_name, idx)
923 def GetAllInstancesInfo(hypervisor_list):
924 """Gather data about all instances.
926 This is the equivalent of L{GetInstanceInfo}, except that it
927 computes data for all instances at once, thus being faster if one
928 needs data about more than one instance.
930 @type hypervisor_list: list
931 @param hypervisor_list: list of hypervisors to query for instance data
934 @return: dictionary of instance: data, with data having the following keys:
935 - memory: memory size of instance (int)
936 - state: xen state of instance (string)
937 - time: cpu time of instance (float)
938 - vcpus: the number of vcpus
943 for hname in hypervisor_list:
944 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
946 for name, _, memory, vcpus, state, times in iinfo:
954 # we only check static parameters, like memory and vcpus,
955 # and not state and time which can change between the
956 # invocations of the different hypervisors
957 for key in "memory", "vcpus":
958 if value[key] != output[name][key]:
959 _Fail("Instance %s is running twice"
960 " with different parameters", name)
966 def _InstanceLogName(kind, os_name, instance, component):
967 """Compute the OS log filename for a given instance and operation.
969 The instance name and os name are passed in as strings since not all
970 operations have these as part of an instance object.
973 @param kind: the operation type (e.g. add, import, etc.)
974 @type os_name: string
975 @param os_name: the os name
976 @type instance: string
977 @param instance: the name of the instance being imported/added/etc.
978 @type component: string or None
979 @param component: the name of the component of the instance being
983 # TODO: Use tempfile.mkstemp to create unique filename
985 assert "/" not in component
986 c_msg = "-%s" % component
989 base = ("%s-%s-%s%s-%s.log" %
990 (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
991 return utils.PathJoin(constants.LOG_OS_DIR, base)
994 def InstanceOsAdd(instance, reinstall, debug):
995 """Add an OS to an instance.
997 @type instance: L{objects.Instance}
998 @param instance: Instance whose OS is to be installed
999 @type reinstall: boolean
1000 @param reinstall: whether this is an instance reinstall
1001 @type debug: integer
1002 @param debug: debug level, passed to the OS scripts
1006 inst_os = OSFromDisk(instance.os)
1008 create_env = OSEnvironment(instance, inst_os, debug)
1010 create_env["INSTANCE_REINSTALL"] = "1"
1012 logfile = _InstanceLogName("add", instance.os, instance.name, None)
1014 result = utils.RunCmd([inst_os.create_script], env=create_env,
1015 cwd=inst_os.path, output=logfile, reset_env=True)
1017 logging.error("os create command '%s' returned error: %s, logfile: %s,"
1018 " output: %s", result.cmd, result.fail_reason, logfile,
1020 lines = [utils.SafeEncode(val)
1021 for val in utils.TailFile(logfile, lines=20)]
1022 _Fail("OS create script failed (%s), last lines in the"
1023 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1026 def RunRenameInstance(instance, old_name, debug):
1027 """Run the OS rename script for an instance.
1029 @type instance: L{objects.Instance}
1030 @param instance: Instance whose OS is to be installed
1031 @type old_name: string
1032 @param old_name: previous instance name
1033 @type debug: integer
1034 @param debug: debug level, passed to the OS scripts
1036 @return: the success of the operation
1039 inst_os = OSFromDisk(instance.os)
1041 rename_env = OSEnvironment(instance, inst_os, debug)
1042 rename_env["OLD_INSTANCE_NAME"] = old_name
1044 logfile = _InstanceLogName("rename", instance.os,
1045 "%s-%s" % (old_name, instance.name), None)
1047 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1048 cwd=inst_os.path, output=logfile, reset_env=True)
1051 logging.error("os create command '%s' returned error: %s output: %s",
1052 result.cmd, result.fail_reason, result.output)
1053 lines = [utils.SafeEncode(val)
1054 for val in utils.TailFile(logfile, lines=20)]
1055 _Fail("OS rename script failed (%s), last lines in the"
1056 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1059 def _GetBlockDevSymlinkPath(instance_name, idx):
1060 return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1061 (instance_name, constants.DISK_SEPARATOR, idx))
1064 def _SymlinkBlockDev(instance_name, device_path, idx):
1065 """Set up symlinks to a instance's block device.
1067 This is an auxiliary function run when an instance is start (on the primary
1068 node) or when an instance is migrated (on the target node).
1071 @param instance_name: the name of the target instance
1072 @param device_path: path of the physical block device, on the node
1073 @param idx: the disk index
1074 @return: absolute path to the disk's symlink
1077 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1079 os.symlink(device_path, link_name)
1080 except OSError, err:
1081 if err.errno == errno.EEXIST:
1082 if (not os.path.islink(link_name) or
1083 os.readlink(link_name) != device_path):
1084 os.remove(link_name)
1085 os.symlink(device_path, link_name)
1092 def _RemoveBlockDevLinks(instance_name, disks):
1093 """Remove the block device symlinks belonging to the given instance.
1096 for idx, _ in enumerate(disks):
1097 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1098 if os.path.islink(link_name):
1100 os.remove(link_name)
1102 logging.exception("Can't remove symlink '%s'", link_name)
1105 def _GatherAndLinkBlockDevs(instance):
1106 """Set up an instance's block device(s).
1108 This is run on the primary node at instance startup. The block
1109 devices must be already assembled.
1111 @type instance: L{objects.Instance}
1112 @param instance: the instance whose disks we shoul assemble
1114 @return: list of (disk_object, device_path)
1118 for idx, disk in enumerate(instance.disks):
1119 device = _RecursiveFindBD(disk)
1121 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1125 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1127 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1130 block_devices.append((disk, link_name))
1132 return block_devices
1135 def StartInstance(instance, startup_paused):
1136 """Start an instance.
1138 @type instance: L{objects.Instance}
1139 @param instance: the instance object
1140 @type startup_paused: bool
1141 @param instance: pause instance at startup?
1145 running_instances = GetInstanceList([instance.hypervisor])
1147 if instance.name in running_instances:
1148 logging.info("Instance %s already running, not starting", instance.name)
1152 block_devices = _GatherAndLinkBlockDevs(instance)
1153 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1154 hyper.StartInstance(instance, block_devices, startup_paused)
1155 except errors.BlockDeviceError, err:
1156 _Fail("Block device error: %s", err, exc=True)
1157 except errors.HypervisorError, err:
1158 _RemoveBlockDevLinks(instance.name, instance.disks)
1159 _Fail("Hypervisor error: %s", err, exc=True)
1162 def InstanceShutdown(instance, timeout):
1163 """Shut an instance down.
1165 @note: this functions uses polling with a hardcoded timeout.
1167 @type instance: L{objects.Instance}
1168 @param instance: the instance object
1169 @type timeout: integer
1170 @param timeout: maximum timeout for soft shutdown
1174 hv_name = instance.hypervisor
1175 hyper = hypervisor.GetHypervisor(hv_name)
1176 iname = instance.name
1178 if instance.name not in hyper.ListInstances():
1179 logging.info("Instance %s not running, doing nothing", iname)
1184 self.tried_once = False
1187 if iname not in hyper.ListInstances():
1191 hyper.StopInstance(instance, retry=self.tried_once)
1192 except errors.HypervisorError, err:
1193 if iname not in hyper.ListInstances():
1194 # if the instance is no longer existing, consider this a
1195 # success and go to cleanup
1198 _Fail("Failed to stop instance %s: %s", iname, err)
1200 self.tried_once = True
1202 raise utils.RetryAgain()
1205 utils.Retry(_TryShutdown(), 5, timeout)
1206 except utils.RetryTimeout:
1207 # the shutdown did not succeed
1208 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1211 hyper.StopInstance(instance, force=True)
1212 except errors.HypervisorError, err:
1213 if iname in hyper.ListInstances():
1214 # only raise an error if the instance still exists, otherwise
1215 # the error could simply be "instance ... unknown"!
1216 _Fail("Failed to force stop instance %s: %s", iname, err)
1220 if iname in hyper.ListInstances():
1221 _Fail("Could not shutdown instance %s even by destroy", iname)
1224 hyper.CleanupInstance(instance.name)
1225 except errors.HypervisorError, err:
1226 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1228 _RemoveBlockDevLinks(iname, instance.disks)
1231 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1232 """Reboot an instance.
1234 @type instance: L{objects.Instance}
1235 @param instance: the instance object to reboot
1236 @type reboot_type: str
1237 @param reboot_type: the type of reboot, one the following
1239 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1240 instance OS, do not recreate the VM
1241 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1242 restart the VM (at the hypervisor level)
1243 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1244 not accepted here, since that mode is handled differently, in
1245 cmdlib, and translates into full stop and start of the
1246 instance (instead of a call_instance_reboot RPC)
1247 @type shutdown_timeout: integer
1248 @param shutdown_timeout: maximum timeout for soft shutdown
1252 running_instances = GetInstanceList([instance.hypervisor])
1254 if instance.name not in running_instances:
1255 _Fail("Cannot reboot instance %s that is not running", instance.name)
1257 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1258 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1260 hyper.RebootInstance(instance)
1261 except errors.HypervisorError, err:
1262 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1263 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1265 InstanceShutdown(instance, shutdown_timeout)
1266 return StartInstance(instance, False)
1267 except errors.HypervisorError, err:
1268 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1270 _Fail("Invalid reboot_type received: %s", reboot_type)
1273 def MigrationInfo(instance):
1274 """Gather information about an instance to be migrated.
1276 @type instance: L{objects.Instance}
1277 @param instance: the instance definition
1280 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1282 info = hyper.MigrationInfo(instance)
1283 except errors.HypervisorError, err:
1284 _Fail("Failed to fetch migration information: %s", err, exc=True)
1288 def AcceptInstance(instance, info, target):
1289 """Prepare the node to accept an instance.
1291 @type instance: L{objects.Instance}
1292 @param instance: the instance definition
1293 @type info: string/data (opaque)
1294 @param info: migration information, from the source node
1295 @type target: string
1296 @param target: target host (usually ip), on this node
1299 # TODO: why is this required only for DTS_EXT_MIRROR?
1300 if instance.disk_template in constants.DTS_EXT_MIRROR:
1301 # Create the symlinks, as the disks are not active
1304 _GatherAndLinkBlockDevs(instance)
1305 except errors.BlockDeviceError, err:
1306 _Fail("Block device error: %s", err, exc=True)
1308 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1310 hyper.AcceptInstance(instance, info, target)
1311 except errors.HypervisorError, err:
1312 if instance.disk_template in constants.DTS_EXT_MIRROR:
1313 _RemoveBlockDevLinks(instance.name, instance.disks)
1314 _Fail("Failed to accept instance: %s", err, exc=True)
1317 def FinalizeMigrationDst(instance, info, success):
1318 """Finalize any preparation to accept an instance.
1320 @type instance: L{objects.Instance}
1321 @param instance: the instance definition
1322 @type info: string/data (opaque)
1323 @param info: migration information, from the source node
1324 @type success: boolean
1325 @param success: whether the migration was a success or a failure
1328 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1330 hyper.FinalizeMigrationDst(instance, info, success)
1331 except errors.HypervisorError, err:
1332 _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1335 def MigrateInstance(instance, target, live):
1336 """Migrates an instance to another node.
1338 @type instance: L{objects.Instance}
1339 @param instance: the instance definition
1340 @type target: string
1341 @param target: the target node name
1343 @param live: whether the migration should be done live or not (the
1344 interpretation of this parameter is left to the hypervisor)
1345 @raise RPCFail: if migration fails for some reason
1348 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1351 hyper.MigrateInstance(instance, target, live)
1352 except errors.HypervisorError, err:
1353 _Fail("Failed to migrate instance: %s", err, exc=True)
1356 def FinalizeMigrationSource(instance, success, live):
1357 """Finalize the instance migration on the source node.
1359 @type instance: L{objects.Instance}
1360 @param instance: the instance definition of the migrated instance
1362 @param success: whether the migration succeeded or not
1364 @param live: whether the user requested a live migration or not
1365 @raise RPCFail: If the execution fails for some reason
1368 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1371 hyper.FinalizeMigrationSource(instance, success, live)
1372 except Exception, err: # pylint: disable=W0703
1373 _Fail("Failed to finalize the migration on the source node: %s", err,
1377 def GetMigrationStatus(instance):
1378 """Get the migration status
1380 @type instance: L{objects.Instance}
1381 @param instance: the instance that is being migrated
1382 @rtype: L{objects.MigrationStatus}
1383 @return: the status of the current migration (one of
1384 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1385 progress info that can be retrieved from the hypervisor
1386 @raise RPCFail: If the migration status cannot be retrieved
1389 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1391 return hyper.GetMigrationStatus(instance)
1392 except Exception, err: # pylint: disable=W0703
1393 _Fail("Failed to get migration status: %s", err, exc=True)
1396 def BlockdevCreate(disk, size, owner, on_primary, info):
1397 """Creates a block device for an instance.
1399 @type disk: L{objects.Disk}
1400 @param disk: the object describing the disk we should create
1402 @param size: the size of the physical underlying device, in MiB
1404 @param owner: the name of the instance for which disk is created,
1405 used for device cache data
1406 @type on_primary: boolean
1407 @param on_primary: indicates if it is the primary node or not
1409 @param info: string that will be sent to the physical device
1410 creation, used for example to set (LVM) tags on LVs
1412 @return: the new unique_id of the device (this can sometime be
1413 computed only after creation), or None. On secondary nodes,
1414 it's not required to return anything.
1417 # TODO: remove the obsolete "size" argument
1418 # pylint: disable=W0613
1421 for child in disk.children:
1423 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1424 except errors.BlockDeviceError, err:
1425 _Fail("Can't assemble device %s: %s", child, err)
1426 if on_primary or disk.AssembleOnSecondary():
1427 # we need the children open in case the device itself has to
1430 # pylint: disable=E1103
1432 except errors.BlockDeviceError, err:
1433 _Fail("Can't make child '%s' read-write: %s", child, err)
1437 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1438 except errors.BlockDeviceError, err:
1439 _Fail("Can't create block device: %s", err)
1441 if on_primary or disk.AssembleOnSecondary():
1444 except errors.BlockDeviceError, err:
1445 _Fail("Can't assemble device after creation, unusual event: %s", err)
1446 device.SetSyncSpeed(constants.SYNC_SPEED)
1447 if on_primary or disk.OpenOnSecondary():
1449 device.Open(force=True)
1450 except errors.BlockDeviceError, err:
1451 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1452 DevCacheManager.UpdateCache(device.dev_path, owner,
1453 on_primary, disk.iv_name)
1455 device.SetInfo(info)
1457 return device.unique_id
1460 def _WipeDevice(path, offset, size):
1461 """This function actually wipes the device.
1463 @param path: The path to the device to wipe
1464 @param offset: The offset in MiB in the file
1465 @param size: The size in MiB to write
1468 cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1469 "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1471 result = utils.RunCmd(cmd)
1474 _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1475 result.fail_reason, result.output)
1478 def BlockdevWipe(disk, offset, size):
1479 """Wipes a block device.
1481 @type disk: L{objects.Disk}
1482 @param disk: the disk object we want to wipe
1484 @param offset: The offset in MiB in the file
1486 @param size: The size in MiB to write
1490 rdev = _RecursiveFindBD(disk)
1491 except errors.BlockDeviceError:
1495 _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1497 # Do cross verify some of the parameters
1498 if offset > rdev.size:
1499 _Fail("Offset is bigger than device size")
1500 if (offset + size) > rdev.size:
1501 _Fail("The provided offset and size to wipe is bigger than device size")
1503 _WipeDevice(rdev.dev_path, offset, size)
1506 def BlockdevPauseResumeSync(disks, pause):
1507 """Pause or resume the sync of the block device.
1509 @type disks: list of L{objects.Disk}
1510 @param disks: the disks object we want to pause/resume
1512 @param pause: Wheater to pause or resume
1518 rdev = _RecursiveFindBD(disk)
1519 except errors.BlockDeviceError:
1523 success.append((False, ("Cannot change sync for device %s:"
1524 " device not found" % disk.iv_name)))
1527 result = rdev.PauseResumeSync(pause)
1530 success.append((result, None))
1536 success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1541 def BlockdevRemove(disk):
1542 """Remove a block device.
1544 @note: This is intended to be called recursively.
1546 @type disk: L{objects.Disk}
1547 @param disk: the disk object we should remove
1549 @return: the success of the operation
1554 rdev = _RecursiveFindBD(disk)
1555 except errors.BlockDeviceError, err:
1556 # probably can't attach
1557 logging.info("Can't attach to device %s in remove", disk)
1559 if rdev is not None:
1560 r_path = rdev.dev_path
1563 except errors.BlockDeviceError, err:
1564 msgs.append(str(err))
1566 DevCacheManager.RemoveCache(r_path)
1569 for child in disk.children:
1571 BlockdevRemove(child)
1572 except RPCFail, err:
1573 msgs.append(str(err))
1576 _Fail("; ".join(msgs))
1579 def _RecursiveAssembleBD(disk, owner, as_primary):
1580 """Activate a block device for an instance.
1582 This is run on the primary and secondary nodes for an instance.
1584 @note: this function is called recursively.
1586 @type disk: L{objects.Disk}
1587 @param disk: the disk we try to assemble
1589 @param owner: the name of the instance which owns the disk
1590 @type as_primary: boolean
1591 @param as_primary: if we should make the block device
1594 @return: the assembled device or None (in case no device
1596 @raise errors.BlockDeviceError: in case there is an error
1597 during the activation of the children or the device
1603 mcn = disk.ChildrenNeeded()
1605 mcn = 0 # max number of Nones allowed
1607 mcn = len(disk.children) - mcn # max number of Nones
1608 for chld_disk in disk.children:
1610 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1611 except errors.BlockDeviceError, err:
1612 if children.count(None) >= mcn:
1615 logging.error("Error in child activation (but continuing): %s",
1617 children.append(cdev)
1619 if as_primary or disk.AssembleOnSecondary():
1620 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1621 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1623 if as_primary or disk.OpenOnSecondary():
1625 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1626 as_primary, disk.iv_name)
1633 def BlockdevAssemble(disk, owner, as_primary, idx):
1634 """Activate a block device for an instance.
1636 This is a wrapper over _RecursiveAssembleBD.
1638 @rtype: str or boolean
1639 @return: a C{/dev/...} path for primary nodes, and
1640 C{True} for secondary nodes
1644 result = _RecursiveAssembleBD(disk, owner, as_primary)
1645 if isinstance(result, bdev.BlockDev):
1646 # pylint: disable=E1103
1647 result = result.dev_path
1649 _SymlinkBlockDev(owner, result, idx)
1650 except errors.BlockDeviceError, err:
1651 _Fail("Error while assembling disk: %s", err, exc=True)
1652 except OSError, err:
1653 _Fail("Error while symlinking disk: %s", err, exc=True)
1658 def BlockdevShutdown(disk):
1659 """Shut down a block device.
1661 First, if the device is assembled (Attach() is successful), then
1662 the device is shutdown. Then the children of the device are
1665 This function is called recursively. Note that we don't cache the
1666 children or such, as oppossed to assemble, shutdown of different
1667 devices doesn't require that the upper device was active.
1669 @type disk: L{objects.Disk}
1670 @param disk: the description of the disk we should
1676 r_dev = _RecursiveFindBD(disk)
1677 if r_dev is not None:
1678 r_path = r_dev.dev_path
1681 DevCacheManager.RemoveCache(r_path)
1682 except errors.BlockDeviceError, err:
1683 msgs.append(str(err))
1686 for child in disk.children:
1688 BlockdevShutdown(child)
1689 except RPCFail, err:
1690 msgs.append(str(err))
1693 _Fail("; ".join(msgs))
1696 def BlockdevAddchildren(parent_cdev, new_cdevs):
1697 """Extend a mirrored block device.
1699 @type parent_cdev: L{objects.Disk}
1700 @param parent_cdev: the disk to which we should add children
1701 @type new_cdevs: list of L{objects.Disk}
1702 @param new_cdevs: the list of children which we should add
1706 parent_bdev = _RecursiveFindBD(parent_cdev)
1707 if parent_bdev is None:
1708 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1709 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1710 if new_bdevs.count(None) > 0:
1711 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1712 parent_bdev.AddChildren(new_bdevs)
1715 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1716 """Shrink a mirrored block device.
1718 @type parent_cdev: L{objects.Disk}
1719 @param parent_cdev: the disk from which we should remove children
1720 @type new_cdevs: list of L{objects.Disk}
1721 @param new_cdevs: the list of children which we should remove
1725 parent_bdev = _RecursiveFindBD(parent_cdev)
1726 if parent_bdev is None:
1727 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1729 for disk in new_cdevs:
1730 rpath = disk.StaticDevPath()
1732 bd = _RecursiveFindBD(disk)
1734 _Fail("Can't find device %s while removing children", disk)
1736 devs.append(bd.dev_path)
1738 if not utils.IsNormAbsPath(rpath):
1739 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1741 parent_bdev.RemoveChildren(devs)
1744 def BlockdevGetmirrorstatus(disks):
1745 """Get the mirroring status of a list of devices.
1747 @type disks: list of L{objects.Disk}
1748 @param disks: the list of disks which we should query
1750 @return: List of L{objects.BlockDevStatus}, one for each disk
1751 @raise errors.BlockDeviceError: if any of the disks cannot be
1757 rbd = _RecursiveFindBD(dsk)
1759 _Fail("Can't find device %s", dsk)
1761 stats.append(rbd.CombinedSyncStatus())
1766 def BlockdevGetmirrorstatusMulti(disks):
1767 """Get the mirroring status of a list of devices.
1769 @type disks: list of L{objects.Disk}
1770 @param disks: the list of disks which we should query
1772 @return: List of tuples, (bool, status), one for each disk; bool denotes
1773 success/failure, status is L{objects.BlockDevStatus} on success, string
1780 rbd = _RecursiveFindBD(disk)
1782 result.append((False, "Can't find device %s" % disk))
1785 status = rbd.CombinedSyncStatus()
1786 except errors.BlockDeviceError, err:
1787 logging.exception("Error while getting disk status")
1788 result.append((False, str(err)))
1790 result.append((True, status))
1792 assert len(disks) == len(result)
1797 def _RecursiveFindBD(disk):
1798 """Check if a device is activated.
1800 If so, return information about the real device.
1802 @type disk: L{objects.Disk}
1803 @param disk: the disk object we need to find
1805 @return: None if the device can't be found,
1806 otherwise the device instance
1811 for chdisk in disk.children:
1812 children.append(_RecursiveFindBD(chdisk))
1814 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1817 def _OpenRealBD(disk):
1818 """Opens the underlying block device of a disk.
1820 @type disk: L{objects.Disk}
1821 @param disk: the disk object we want to open
1824 real_disk = _RecursiveFindBD(disk)
1825 if real_disk is None:
1826 _Fail("Block device '%s' is not set up", disk)
1833 def BlockdevFind(disk):
1834 """Check if a device is activated.
1836 If it is, return information about the real device.
1838 @type disk: L{objects.Disk}
1839 @param disk: the disk to find
1840 @rtype: None or objects.BlockDevStatus
1841 @return: None if the disk cannot be found, otherwise a the current
1846 rbd = _RecursiveFindBD(disk)
1847 except errors.BlockDeviceError, err:
1848 _Fail("Failed to find device: %s", err, exc=True)
1853 return rbd.GetSyncStatus()
1856 def BlockdevGetsize(disks):
1857 """Computes the size of the given disks.
1859 If a disk is not found, returns None instead.
1861 @type disks: list of L{objects.Disk}
1862 @param disks: the list of disk to compute the size for
1864 @return: list with elements None if the disk cannot be found,
1871 rbd = _RecursiveFindBD(cf)
1872 except errors.BlockDeviceError:
1878 result.append(rbd.GetActualSize())
1882 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1883 """Export a block device to a remote node.
1885 @type disk: L{objects.Disk}
1886 @param disk: the description of the disk to export
1887 @type dest_node: str
1888 @param dest_node: the destination node to export to
1889 @type dest_path: str
1890 @param dest_path: the destination path on the target node
1891 @type cluster_name: str
1892 @param cluster_name: the cluster name, needed for SSH hostalias
1896 real_disk = _OpenRealBD(disk)
1898 # the block size on the read dd is 1MiB to match our units
1899 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1900 "dd if=%s bs=1048576 count=%s",
1901 real_disk.dev_path, str(disk.size))
1903 # we set here a smaller block size as, due to ssh buffering, more
1904 # than 64-128k will mostly ignored; we use nocreat to fail if the
1905 # device is not already there or we pass a wrong path; we use
1906 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1907 # to not buffer too much memory; this means that at best, we flush
1908 # every 64k, which will not be very fast
1909 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1910 " oflag=dsync", dest_path)
1912 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1913 constants.GANETI_RUNAS,
1916 # all commands have been checked, so we're safe to combine them
1917 command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1919 result = utils.RunCmd(["bash", "-c", command])
1922 _Fail("Disk copy command '%s' returned error: %s"
1923 " output: %s", command, result.fail_reason, result.output)
1926 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1927 """Write a file to the filesystem.
1929 This allows the master to overwrite(!) a file. It will only perform
1930 the operation if the file belongs to a list of configuration files.
1932 @type file_name: str
1933 @param file_name: the target file name
1935 @param data: the new contents of the file
1937 @param mode: the mode to give the file (can be None)
1939 @param uid: the owner of the file
1941 @param gid: the group of the file
1943 @param atime: the atime to set on the file (can be None)
1945 @param mtime: the mtime to set on the file (can be None)
1949 if not os.path.isabs(file_name):
1950 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1952 if file_name not in _ALLOWED_UPLOAD_FILES:
1953 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1956 raw_data = _Decompress(data)
1958 if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
1959 _Fail("Invalid username/groupname type")
1961 getents = runtime.GetEnts()
1962 uid = getents.LookupUser(uid)
1963 gid = getents.LookupGroup(gid)
1965 utils.SafeWriteFile(file_name, None,
1966 data=raw_data, mode=mode, uid=uid, gid=gid,
1967 atime=atime, mtime=mtime)
1970 def RunOob(oob_program, command, node, timeout):
1971 """Executes oob_program with given command on given node.
1973 @param oob_program: The path to the executable oob_program
1974 @param command: The command to invoke on oob_program
1975 @param node: The node given as an argument to the program
1976 @param timeout: Timeout after which we kill the oob program
1979 @raise RPCFail: If execution fails for some reason
1982 result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1985 _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1986 result.fail_reason, result.output)
1988 return result.stdout
1991 def WriteSsconfFiles(values):
1992 """Update all ssconf files.
1994 Wrapper around the SimpleStore.WriteFiles.
1997 ssconf.SimpleStore().WriteFiles(values)
2000 def _ErrnoOrStr(err):
2001 """Format an EnvironmentError exception.
2003 If the L{err} argument has an errno attribute, it will be looked up
2004 and converted into a textual C{E...} description. Otherwise the
2005 string representation of the error will be returned.
2007 @type err: L{EnvironmentError}
2008 @param err: the exception to format
2011 if hasattr(err, "errno"):
2012 detail = errno.errorcode[err.errno]
2018 def _OSOndiskAPIVersion(os_dir):
2019 """Compute and return the API version of a given OS.
2021 This function will try to read the API version of the OS residing in
2022 the 'os_dir' directory.
2025 @param os_dir: the directory in which we should look for the OS
2027 @return: tuple (status, data) with status denoting the validity and
2028 data holding either the vaid versions or an error message
2031 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2034 st = os.stat(api_file)
2035 except EnvironmentError, err:
2036 return False, ("Required file '%s' not found under path %s: %s" %
2037 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
2039 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2040 return False, ("File '%s' in %s is not a regular file" %
2041 (constants.OS_API_FILE, os_dir))
2044 api_versions = utils.ReadFile(api_file).splitlines()
2045 except EnvironmentError, err:
2046 return False, ("Error while reading the API version file at %s: %s" %
2047 (api_file, _ErrnoOrStr(err)))
2050 api_versions = [int(version.strip()) for version in api_versions]
2051 except (TypeError, ValueError), err:
2052 return False, ("API version(s) can't be converted to integer: %s" %
2055 return True, api_versions
2058 def DiagnoseOS(top_dirs=None):
2059 """Compute the validity for all OSes.
2061 @type top_dirs: list
2062 @param top_dirs: the list of directories in which to
2063 search (if not given defaults to
2064 L{constants.OS_SEARCH_PATH})
2065 @rtype: list of L{objects.OS}
2066 @return: a list of tuples (name, path, status, diagnose, variants,
2067 parameters, api_version) for all (potential) OSes under all
2068 search paths, where:
2069 - name is the (potential) OS name
2070 - path is the full path to the OS
2071 - status True/False is the validity of the OS
2072 - diagnose is the error message for an invalid OS, otherwise empty
2073 - variants is a list of supported OS variants, if any
2074 - parameters is a list of (name, help) parameters, if any
2075 - api_version is a list of support OS API versions
2078 if top_dirs is None:
2079 top_dirs = constants.OS_SEARCH_PATH
2082 for dir_name in top_dirs:
2083 if os.path.isdir(dir_name):
2085 f_names = utils.ListVisibleFiles(dir_name)
2086 except EnvironmentError, err:
2087 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2089 for name in f_names:
2090 os_path = utils.PathJoin(dir_name, name)
2091 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2094 variants = os_inst.supported_variants
2095 parameters = os_inst.supported_parameters
2096 api_versions = os_inst.api_versions
2099 variants = parameters = api_versions = []
2100 result.append((name, os_path, status, diagnose, variants,
2101 parameters, api_versions))
2106 def _TryOSFromDisk(name, base_dir=None):
2107 """Create an OS instance from disk.
2109 This function will return an OS instance if the given name is a
2112 @type base_dir: string
2113 @keyword base_dir: Base directory containing OS installations.
2114 Defaults to a search in all the OS_SEARCH_PATH dirs.
2116 @return: success and either the OS instance if we find a valid one,
2120 if base_dir is None:
2121 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2123 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2126 return False, "Directory for OS %s not found in search path" % name
2128 status, api_versions = _OSOndiskAPIVersion(os_dir)
2131 return status, api_versions
2133 if not constants.OS_API_VERSIONS.intersection(api_versions):
2134 return False, ("API version mismatch for path '%s': found %s, want %s." %
2135 (os_dir, api_versions, constants.OS_API_VERSIONS))
2137 # OS Files dictionary, we will populate it with the absolute path
2138 # names; if the value is True, then it is a required file, otherwise
2140 os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2142 if max(api_versions) >= constants.OS_API_V15:
2143 os_files[constants.OS_VARIANTS_FILE] = False
2145 if max(api_versions) >= constants.OS_API_V20:
2146 os_files[constants.OS_PARAMETERS_FILE] = True
2148 del os_files[constants.OS_SCRIPT_VERIFY]
2150 for (filename, required) in os_files.items():
2151 os_files[filename] = utils.PathJoin(os_dir, filename)
2154 st = os.stat(os_files[filename])
2155 except EnvironmentError, err:
2156 if err.errno == errno.ENOENT and not required:
2157 del os_files[filename]
2159 return False, ("File '%s' under path '%s' is missing (%s)" %
2160 (filename, os_dir, _ErrnoOrStr(err)))
2162 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2163 return False, ("File '%s' under path '%s' is not a regular file" %
2166 if filename in constants.OS_SCRIPTS:
2167 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2168 return False, ("File '%s' under path '%s' is not executable" %
2172 if constants.OS_VARIANTS_FILE in os_files:
2173 variants_file = os_files[constants.OS_VARIANTS_FILE]
2175 variants = utils.ReadFile(variants_file).splitlines()
2176 except EnvironmentError, err:
2177 # we accept missing files, but not other errors
2178 if err.errno != errno.ENOENT:
2179 return False, ("Error while reading the OS variants file at %s: %s" %
2180 (variants_file, _ErrnoOrStr(err)))
2183 if constants.OS_PARAMETERS_FILE in os_files:
2184 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2186 parameters = utils.ReadFile(parameters_file).splitlines()
2187 except EnvironmentError, err:
2188 return False, ("Error while reading the OS parameters file at %s: %s" %
2189 (parameters_file, _ErrnoOrStr(err)))
2190 parameters = [v.split(None, 1) for v in parameters]
2192 os_obj = objects.OS(name=name, path=os_dir,
2193 create_script=os_files[constants.OS_SCRIPT_CREATE],
2194 export_script=os_files[constants.OS_SCRIPT_EXPORT],
2195 import_script=os_files[constants.OS_SCRIPT_IMPORT],
2196 rename_script=os_files[constants.OS_SCRIPT_RENAME],
2197 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2199 supported_variants=variants,
2200 supported_parameters=parameters,
2201 api_versions=api_versions)
2205 def OSFromDisk(name, base_dir=None):
2206 """Create an OS instance from disk.
2208 This function will return an OS instance if the given name is a
2209 valid OS name. Otherwise, it will raise an appropriate
2210 L{RPCFail} exception, detailing why this is not a valid OS.
2212 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2213 an exception but returns true/false status data.
2215 @type base_dir: string
2216 @keyword base_dir: Base directory containing OS installations.
2217 Defaults to a search in all the OS_SEARCH_PATH dirs.
2218 @rtype: L{objects.OS}
2219 @return: the OS instance if we find a valid one
2220 @raise RPCFail: if we don't find a valid OS
2223 name_only = objects.OS.GetName(name)
2224 status, payload = _TryOSFromDisk(name_only, base_dir)
2232 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2233 """Calculate the basic environment for an os script.
2236 @param os_name: full operating system name (including variant)
2237 @type inst_os: L{objects.OS}
2238 @param inst_os: operating system for which the environment is being built
2239 @type os_params: dict
2240 @param os_params: the OS parameters
2241 @type debug: integer
2242 @param debug: debug level (0 or 1, for OS Api 10)
2244 @return: dict of environment variables
2245 @raise errors.BlockDeviceError: if the block device
2251 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2252 result["OS_API_VERSION"] = "%d" % api_version
2253 result["OS_NAME"] = inst_os.name
2254 result["DEBUG_LEVEL"] = "%d" % debug
2257 if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2258 variant = objects.OS.GetVariant(os_name)
2260 variant = inst_os.supported_variants[0]
2263 result["OS_VARIANT"] = variant
2266 for pname, pvalue in os_params.items():
2267 result["OSP_%s" % pname.upper()] = pvalue
2272 def OSEnvironment(instance, inst_os, debug=0):
2273 """Calculate the environment for an os script.
2275 @type instance: L{objects.Instance}
2276 @param instance: target instance for the os script run
2277 @type inst_os: L{objects.OS}
2278 @param inst_os: operating system for which the environment is being built
2279 @type debug: integer
2280 @param debug: debug level (0 or 1, for OS Api 10)
2282 @return: dict of environment variables
2283 @raise errors.BlockDeviceError: if the block device
2287 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2289 for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2290 result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2292 result["HYPERVISOR"] = instance.hypervisor
2293 result["DISK_COUNT"] = "%d" % len(instance.disks)
2294 result["NIC_COUNT"] = "%d" % len(instance.nics)
2295 result["INSTANCE_SECONDARY_NODES"] = \
2296 ("%s" % " ".join(instance.secondary_nodes))
2299 for idx, disk in enumerate(instance.disks):
2300 real_disk = _OpenRealBD(disk)
2301 result["DISK_%d_PATH" % idx] = real_disk.dev_path
2302 result["DISK_%d_ACCESS" % idx] = disk.mode
2303 if constants.HV_DISK_TYPE in instance.hvparams:
2304 result["DISK_%d_FRONTEND_TYPE" % idx] = \
2305 instance.hvparams[constants.HV_DISK_TYPE]
2306 if disk.dev_type in constants.LDS_BLOCK:
2307 result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2308 elif disk.dev_type == constants.LD_FILE:
2309 result["DISK_%d_BACKEND_TYPE" % idx] = \
2310 "file:%s" % disk.physical_id[0]
2313 for idx, nic in enumerate(instance.nics):
2314 result["NIC_%d_MAC" % idx] = nic.mac
2316 result["NIC_%d_IP" % idx] = nic.ip
2317 result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2318 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2319 result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2320 if nic.nicparams[constants.NIC_LINK]:
2321 result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2322 if constants.HV_NIC_TYPE in instance.hvparams:
2323 result["NIC_%d_FRONTEND_TYPE" % idx] = \
2324 instance.hvparams[constants.HV_NIC_TYPE]
2327 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2328 for key, value in source.items():
2329 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2334 def BlockdevGrow(disk, amount, dryrun):
2335 """Grow a stack of block devices.
2337 This function is called recursively, with the childrens being the
2338 first ones to resize.
2340 @type disk: L{objects.Disk}
2341 @param disk: the disk to be grown
2342 @type amount: integer
2343 @param amount: the amount (in mebibytes) to grow with
2344 @type dryrun: boolean
2345 @param dryrun: whether to execute the operation in simulation mode
2346 only, without actually increasing the size
2347 @rtype: (status, result)
2348 @return: a tuple with the status of the operation (True/False), and
2349 the errors message if status is False
2352 r_dev = _RecursiveFindBD(disk)
2354 _Fail("Cannot find block device %s", disk)
2357 r_dev.Grow(amount, dryrun)
2358 except errors.BlockDeviceError, err:
2359 _Fail("Failed to grow block device: %s", err, exc=True)
2362 def BlockdevSnapshot(disk):
2363 """Create a snapshot copy of a block device.
2365 This function is called recursively, and the snapshot is actually created
2366 just for the leaf lvm backend device.
2368 @type disk: L{objects.Disk}
2369 @param disk: the disk to be snapshotted
2371 @return: snapshot disk ID as (vg, lv)
2374 if disk.dev_type == constants.LD_DRBD8:
2375 if not disk.children:
2376 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2378 return BlockdevSnapshot(disk.children[0])
2379 elif disk.dev_type == constants.LD_LV:
2380 r_dev = _RecursiveFindBD(disk)
2381 if r_dev is not None:
2382 # FIXME: choose a saner value for the snapshot size
2383 # let's stay on the safe side and ask for the full size, for now
2384 return r_dev.Snapshot(disk.size)
2386 _Fail("Cannot find block device %s", disk)
2388 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2389 disk.unique_id, disk.dev_type)
2392 def FinalizeExport(instance, snap_disks):
2393 """Write out the export configuration information.
2395 @type instance: L{objects.Instance}
2396 @param instance: the instance which we export, used for
2397 saving configuration
2398 @type snap_disks: list of L{objects.Disk}
2399 @param snap_disks: list of snapshot block devices, which
2400 will be used to get the actual name of the dump file
2405 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2406 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2408 config = objects.SerializableConfigParser()
2410 config.add_section(constants.INISECT_EXP)
2411 config.set(constants.INISECT_EXP, "version", "0")
2412 config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2413 config.set(constants.INISECT_EXP, "source", instance.primary_node)
2414 config.set(constants.INISECT_EXP, "os", instance.os)
2415 config.set(constants.INISECT_EXP, "compression", "none")
2417 config.add_section(constants.INISECT_INS)
2418 config.set(constants.INISECT_INS, "name", instance.name)
2419 config.set(constants.INISECT_INS, "memory", "%d" %
2420 instance.beparams[constants.BE_MEMORY])
2421 config.set(constants.INISECT_INS, "vcpus", "%d" %
2422 instance.beparams[constants.BE_VCPUS])
2423 config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2424 config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2425 config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2428 for nic_count, nic in enumerate(instance.nics):
2430 config.set(constants.INISECT_INS, "nic%d_mac" %
2431 nic_count, "%s" % nic.mac)
2432 config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2433 for param in constants.NICS_PARAMETER_TYPES:
2434 config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2435 "%s" % nic.nicparams.get(param, None))
2436 # TODO: redundant: on load can read nics until it doesn't exist
2437 config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2440 for disk_count, disk in enumerate(snap_disks):
2443 config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2444 ("%s" % disk.iv_name))
2445 config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2446 ("%s" % disk.physical_id[1]))
2447 config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2450 config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2452 # New-style hypervisor/backend parameters
2454 config.add_section(constants.INISECT_HYP)
2455 for name, value in instance.hvparams.items():
2456 if name not in constants.HVC_GLOBALS:
2457 config.set(constants.INISECT_HYP, name, str(value))
2459 config.add_section(constants.INISECT_BEP)
2460 for name, value in instance.beparams.items():
2461 config.set(constants.INISECT_BEP, name, str(value))
2463 config.add_section(constants.INISECT_OSP)
2464 for name, value in instance.osparams.items():
2465 config.set(constants.INISECT_OSP, name, str(value))
2467 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2468 data=config.Dumps())
2469 shutil.rmtree(finaldestdir, ignore_errors=True)
2470 shutil.move(destdir, finaldestdir)
2473 def ExportInfo(dest):
2474 """Get export configuration information.
2477 @param dest: directory containing the export
2479 @rtype: L{objects.SerializableConfigParser}
2480 @return: a serializable config file containing the
2484 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2486 config = objects.SerializableConfigParser()
2489 if (not config.has_section(constants.INISECT_EXP) or
2490 not config.has_section(constants.INISECT_INS)):
2491 _Fail("Export info file doesn't have the required fields")
2493 return config.Dumps()
2497 """Return a list of exports currently available on this machine.
2500 @return: list of the exports
2503 if os.path.isdir(constants.EXPORT_DIR):
2504 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2506 _Fail("No exports directory")
2509 def RemoveExport(export):
2510 """Remove an existing export from the node.
2513 @param export: the name of the export to remove
2517 target = utils.PathJoin(constants.EXPORT_DIR, export)
2520 shutil.rmtree(target)
2521 except EnvironmentError, err:
2522 _Fail("Error while removing the export: %s", err, exc=True)
2525 def BlockdevRename(devlist):
2526 """Rename a list of block devices.
2528 @type devlist: list of tuples
2529 @param devlist: list of tuples of the form (disk,
2530 new_logical_id, new_physical_id); disk is an
2531 L{objects.Disk} object describing the current disk,
2532 and new logical_id/physical_id is the name we
2535 @return: True if all renames succeeded, False otherwise
2540 for disk, unique_id in devlist:
2541 dev = _RecursiveFindBD(disk)
2543 msgs.append("Can't find device %s in rename" % str(disk))
2547 old_rpath = dev.dev_path
2548 dev.Rename(unique_id)
2549 new_rpath = dev.dev_path
2550 if old_rpath != new_rpath:
2551 DevCacheManager.RemoveCache(old_rpath)
2552 # FIXME: we should add the new cache information here, like:
2553 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2554 # but we don't have the owner here - maybe parse from existing
2555 # cache? for now, we only lose lvm data when we rename, which
2556 # is less critical than DRBD or MD
2557 except errors.BlockDeviceError, err:
2558 msgs.append("Can't rename device '%s' to '%s': %s" %
2559 (dev, unique_id, err))
2560 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2563 _Fail("; ".join(msgs))
2566 def _TransformFileStorageDir(fs_dir):
2567 """Checks whether given file_storage_dir is valid.
2569 Checks wheter the given fs_dir is within the cluster-wide default
2570 file_storage_dir or the shared_file_storage_dir, which are stored in
2571 SimpleStore. Only paths under those directories are allowed.
2574 @param fs_dir: the path to check
2576 @return: the normalized path if valid, None otherwise
2579 if not constants.ENABLE_FILE_STORAGE:
2580 _Fail("File storage disabled at configure time")
2582 fs_dir = os.path.normpath(fs_dir)
2583 base_fstore = cfg.GetFileStorageDir()
2584 base_shared = cfg.GetSharedFileStorageDir()
2585 if not (utils.IsBelowDir(base_fstore, fs_dir) or
2586 utils.IsBelowDir(base_shared, fs_dir)):
2587 _Fail("File storage directory '%s' is not under base file"
2588 " storage directory '%s' or shared storage directory '%s'",
2589 fs_dir, base_fstore, base_shared)
2593 def CreateFileStorageDir(file_storage_dir):
2594 """Create file storage directory.
2596 @type file_storage_dir: str
2597 @param file_storage_dir: directory to create
2600 @return: tuple with first element a boolean indicating wheter dir
2601 creation was successful or not
2604 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2605 if os.path.exists(file_storage_dir):
2606 if not os.path.isdir(file_storage_dir):
2607 _Fail("Specified storage dir '%s' is not a directory",
2611 os.makedirs(file_storage_dir, 0750)
2612 except OSError, err:
2613 _Fail("Cannot create file storage directory '%s': %s",
2614 file_storage_dir, err, exc=True)
2617 def RemoveFileStorageDir(file_storage_dir):
2618 """Remove file storage directory.
2620 Remove it only if it's empty. If not log an error and return.
2622 @type file_storage_dir: str
2623 @param file_storage_dir: the directory we should cleanup
2624 @rtype: tuple (success,)
2625 @return: tuple of one element, C{success}, denoting
2626 whether the operation was successful
2629 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2630 if os.path.exists(file_storage_dir):
2631 if not os.path.isdir(file_storage_dir):
2632 _Fail("Specified Storage directory '%s' is not a directory",
2634 # deletes dir only if empty, otherwise we want to fail the rpc call
2636 os.rmdir(file_storage_dir)
2637 except OSError, err:
2638 _Fail("Cannot remove file storage directory '%s': %s",
2639 file_storage_dir, err)
2642 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2643 """Rename the file storage directory.
2645 @type old_file_storage_dir: str
2646 @param old_file_storage_dir: the current path
2647 @type new_file_storage_dir: str
2648 @param new_file_storage_dir: the name we should rename to
2649 @rtype: tuple (success,)
2650 @return: tuple of one element, C{success}, denoting
2651 whether the operation was successful
2654 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2655 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2656 if not os.path.exists(new_file_storage_dir):
2657 if os.path.isdir(old_file_storage_dir):
2659 os.rename(old_file_storage_dir, new_file_storage_dir)
2660 except OSError, err:
2661 _Fail("Cannot rename '%s' to '%s': %s",
2662 old_file_storage_dir, new_file_storage_dir, err)
2664 _Fail("Specified storage dir '%s' is not a directory",
2665 old_file_storage_dir)
2667 if os.path.exists(old_file_storage_dir):
2668 _Fail("Cannot rename '%s' to '%s': both locations exist",
2669 old_file_storage_dir, new_file_storage_dir)
2672 def _EnsureJobQueueFile(file_name):
2673 """Checks whether the given filename is in the queue directory.
2675 @type file_name: str
2676 @param file_name: the file name we should check
2678 @raises RPCFail: if the file is not valid
2681 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2682 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2685 _Fail("Passed job queue file '%s' does not belong to"
2686 " the queue directory '%s'", file_name, queue_dir)
2689 def JobQueueUpdate(file_name, content):
2690 """Updates a file in the queue directory.
2692 This is just a wrapper over L{utils.io.WriteFile}, with proper
2695 @type file_name: str
2696 @param file_name: the job file name
2698 @param content: the new job contents
2700 @return: the success of the operation
2703 _EnsureJobQueueFile(file_name)
2704 getents = runtime.GetEnts()
2706 # Write and replace the file atomically
2707 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2708 gid=getents.masterd_gid)
2711 def JobQueueRename(old, new):
2712 """Renames a job queue file.
2714 This is just a wrapper over os.rename with proper checking.
2717 @param old: the old (actual) file name
2719 @param new: the desired file name
2721 @return: the success of the operation and payload
2724 _EnsureJobQueueFile(old)
2725 _EnsureJobQueueFile(new)
2727 utils.RenameFile(old, new, mkdir=True)
2730 def BlockdevClose(instance_name, disks):
2731 """Closes the given block devices.
2733 This means they will be switched to secondary mode (in case of
2736 @param instance_name: if the argument is not empty, the symlinks
2737 of this instance will be removed
2738 @type disks: list of L{objects.Disk}
2739 @param disks: the list of disks to be closed
2740 @rtype: tuple (success, message)
2741 @return: a tuple of success and message, where success
2742 indicates the succes of the operation, and message
2743 which will contain the error details in case we
2749 rd = _RecursiveFindBD(cf)
2751 _Fail("Can't find device %s", cf)
2758 except errors.BlockDeviceError, err:
2759 msg.append(str(err))
2761 _Fail("Can't make devices secondary: %s", ",".join(msg))
2764 _RemoveBlockDevLinks(instance_name, disks)
2767 def ValidateHVParams(hvname, hvparams):
2768 """Validates the given hypervisor parameters.
2770 @type hvname: string
2771 @param hvname: the hypervisor name
2772 @type hvparams: dict
2773 @param hvparams: the hypervisor parameters to be validated
2778 hv_type = hypervisor.GetHypervisor(hvname)
2779 hv_type.ValidateParameters(hvparams)
2780 except errors.HypervisorError, err:
2781 _Fail(str(err), log=False)
2784 def _CheckOSPList(os_obj, parameters):
2785 """Check whether a list of parameters is supported by the OS.
2787 @type os_obj: L{objects.OS}
2788 @param os_obj: OS object to check
2789 @type parameters: list
2790 @param parameters: the list of parameters to check
2793 supported = [v[0] for v in os_obj.supported_parameters]
2794 delta = frozenset(parameters).difference(supported)
2796 _Fail("The following parameters are not supported"
2797 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2800 def ValidateOS(required, osname, checks, osparams):
2801 """Validate the given OS' parameters.
2803 @type required: boolean
2804 @param required: whether absence of the OS should translate into
2806 @type osname: string
2807 @param osname: the OS to be validated
2809 @param checks: list of the checks to run (currently only 'parameters')
2810 @type osparams: dict
2811 @param osparams: dictionary with OS parameters
2813 @return: True if the validation passed, or False if the OS was not
2814 found and L{required} was false
2817 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2818 _Fail("Unknown checks required for OS %s: %s", osname,
2819 set(checks).difference(constants.OS_VALIDATE_CALLS))
2821 name_only = objects.OS.GetName(osname)
2822 status, tbv = _TryOSFromDisk(name_only, None)
2830 if max(tbv.api_versions) < constants.OS_API_V20:
2833 if constants.OS_VALIDATE_PARAMETERS in checks:
2834 _CheckOSPList(tbv, osparams.keys())
2836 validate_env = OSCoreEnv(osname, tbv, osparams)
2837 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2838 cwd=tbv.path, reset_env=True)
2840 logging.error("os validate command '%s' returned error: %s output: %s",
2841 result.cmd, result.fail_reason, result.output)
2842 _Fail("OS validation script failed (%s), output: %s",
2843 result.fail_reason, result.output, log=False)
2849 """Demotes the current node from master candidate role.
2852 # try to ensure we're not the master by mistake
2853 master, myself = ssconf.GetMasterAndMyself()
2854 if master == myself:
2855 _Fail("ssconf status shows I'm the master node, will not demote")
2857 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2858 if not result.failed:
2859 _Fail("The master daemon is running, will not demote")
2862 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2863 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2864 except EnvironmentError, err:
2865 if err.errno != errno.ENOENT:
2866 _Fail("Error while backing up cluster file: %s", err, exc=True)
2868 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2871 def _GetX509Filenames(cryptodir, name):
2872 """Returns the full paths for the private key and certificate.
2875 return (utils.PathJoin(cryptodir, name),
2876 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2877 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2880 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2881 """Creates a new X509 certificate for SSL/TLS.
2884 @param validity: Validity in seconds
2885 @rtype: tuple; (string, string)
2886 @return: Certificate name and public part
2889 (key_pem, cert_pem) = \
2890 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2891 min(validity, _MAX_SSL_CERT_VALIDITY))
2893 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2894 prefix="x509-%s-" % utils.TimestampForFilename())
2896 name = os.path.basename(cert_dir)
2897 assert len(name) > 5
2899 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2901 utils.WriteFile(key_file, mode=0400, data=key_pem)
2902 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2904 # Never return private key as it shouldn't leave the node
2905 return (name, cert_pem)
2907 shutil.rmtree(cert_dir, ignore_errors=True)
2911 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2912 """Removes a X509 certificate.
2915 @param name: Certificate name
2918 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2920 utils.RemoveFile(key_file)
2921 utils.RemoveFile(cert_file)
2925 except EnvironmentError, err:
2926 _Fail("Cannot remove certificate directory '%s': %s",
2930 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2931 """Returns the command for the requested input/output.
2933 @type instance: L{objects.Instance}
2934 @param instance: The instance object
2935 @param mode: Import/export mode
2936 @param ieio: Input/output type
2937 @param ieargs: Input/output arguments
2940 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2947 if ieio == constants.IEIO_FILE:
2948 (filename, ) = ieargs
2950 if not utils.IsNormAbsPath(filename):
2951 _Fail("Path '%s' is not normalized or absolute", filename)
2953 real_filename = os.path.realpath(filename)
2954 directory = os.path.dirname(real_filename)
2956 if not utils.IsBelowDir(constants.EXPORT_DIR, real_filename):
2957 _Fail("File '%s' is not under exports directory '%s': %s",
2958 filename, constants.EXPORT_DIR, real_filename)
2961 utils.Makedirs(directory, mode=0750)
2963 quoted_filename = utils.ShellQuote(filename)
2965 if mode == constants.IEM_IMPORT:
2966 suffix = "> %s" % quoted_filename
2967 elif mode == constants.IEM_EXPORT:
2968 suffix = "< %s" % quoted_filename
2970 # Retrieve file size
2972 st = os.stat(filename)
2973 except EnvironmentError, err:
2974 logging.error("Can't stat(2) %s: %s", filename, err)
2976 exp_size = utils.BytesToMebibyte(st.st_size)
2978 elif ieio == constants.IEIO_RAW_DISK:
2981 real_disk = _OpenRealBD(disk)
2983 if mode == constants.IEM_IMPORT:
2984 # we set here a smaller block size as, due to transport buffering, more
2985 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2986 # is not already there or we pass a wrong path; we use notrunc to no
2987 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2988 # much memory; this means that at best, we flush every 64k, which will
2990 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2991 " bs=%s oflag=dsync"),
2995 elif mode == constants.IEM_EXPORT:
2996 # the block size on the read dd is 1MiB to match our units
2997 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2999 str(1024 * 1024), # 1 MB
3001 exp_size = disk.size
3003 elif ieio == constants.IEIO_SCRIPT:
3004 (disk, disk_index, ) = ieargs
3006 assert isinstance(disk_index, (int, long))
3008 real_disk = _OpenRealBD(disk)
3010 inst_os = OSFromDisk(instance.os)
3011 env = OSEnvironment(instance, inst_os)
3013 if mode == constants.IEM_IMPORT:
3014 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3015 env["IMPORT_INDEX"] = str(disk_index)
3016 script = inst_os.import_script
3018 elif mode == constants.IEM_EXPORT:
3019 env["EXPORT_DEVICE"] = real_disk.dev_path
3020 env["EXPORT_INDEX"] = str(disk_index)
3021 script = inst_os.export_script
3023 # TODO: Pass special environment only to script
3024 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3026 if mode == constants.IEM_IMPORT:
3027 suffix = "| %s" % script_cmd
3029 elif mode == constants.IEM_EXPORT:
3030 prefix = "%s |" % script_cmd
3032 # Let script predict size
3033 exp_size = constants.IE_CUSTOM_SIZE
3036 _Fail("Invalid %s I/O mode %r", mode, ieio)
3038 return (env, prefix, suffix, exp_size)
3041 def _CreateImportExportStatusDir(prefix):
3042 """Creates status directory for import/export.
3045 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
3047 (prefix, utils.TimestampForFilename())))
3050 def StartImportExportDaemon(mode, opts, host, port, instance, component,
3052 """Starts an import or export daemon.
3054 @param mode: Import/output mode
3055 @type opts: L{objects.ImportExportOptions}
3056 @param opts: Daemon options
3058 @param host: Remote host for export (None for import)
3060 @param port: Remote port for export (None for import)
3061 @type instance: L{objects.Instance}
3062 @param instance: Instance object
3063 @type component: string
3064 @param component: which part of the instance is transferred now,
3066 @param ieio: Input/output type
3067 @param ieioargs: Input/output arguments
3070 if mode == constants.IEM_IMPORT:
3073 if not (host is None and port is None):
3074 _Fail("Can not specify host or port on import")
3076 elif mode == constants.IEM_EXPORT:
3079 if host is None or port is None:
3080 _Fail("Host and port must be specified for an export")
3083 _Fail("Invalid mode %r", mode)
3085 if (opts.key_name is None) ^ (opts.ca_pem is None):
3086 _Fail("Cluster certificate can only be used for both key and CA")
3088 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3089 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3091 if opts.key_name is None:
3093 key_path = constants.NODED_CERT_FILE
3094 cert_path = constants.NODED_CERT_FILE
3095 assert opts.ca_pem is None
3097 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
3099 assert opts.ca_pem is not None
3101 for i in [key_path, cert_path]:
3102 if not os.path.exists(i):
3103 _Fail("File '%s' does not exist" % i)
3105 status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3107 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3108 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3109 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3111 if opts.ca_pem is None:
3113 ca = utils.ReadFile(constants.NODED_CERT_FILE)
3118 utils.WriteFile(ca_file, data=ca, mode=0400)
3121 constants.IMPORT_EXPORT_DAEMON,
3123 "--key=%s" % key_path,
3124 "--cert=%s" % cert_path,
3125 "--ca=%s" % ca_file,
3129 cmd.append("--host=%s" % host)
3132 cmd.append("--port=%s" % port)
3135 cmd.append("--ipv6")
3137 cmd.append("--ipv4")
3140 cmd.append("--compress=%s" % opts.compress)
3143 cmd.append("--magic=%s" % opts.magic)
3145 if exp_size is not None:
3146 cmd.append("--expected-size=%s" % exp_size)
3149 cmd.append("--cmd-prefix=%s" % cmd_prefix)
3152 cmd.append("--cmd-suffix=%s" % cmd_suffix)
3154 if mode == constants.IEM_EXPORT:
3155 # Retry connection a few times when connecting to remote peer
3156 cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3157 cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3158 elif opts.connect_timeout is not None:
3159 assert mode == constants.IEM_IMPORT
3160 # Overall timeout for establishing connection while listening
3161 cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3163 logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3165 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3166 # support for receiving a file descriptor for output
3167 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3170 # The import/export name is simply the status directory name
3171 return os.path.basename(status_dir)
3174 shutil.rmtree(status_dir, ignore_errors=True)
3178 def GetImportExportStatus(names):
3179 """Returns import/export daemon status.
3181 @type names: sequence
3182 @param names: List of names
3183 @rtype: List of dicts
3184 @return: Returns a list of the state of each named import/export or None if a
3185 status couldn't be read
3191 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3195 data = utils.ReadFile(status_file)
3196 except EnvironmentError, err:
3197 if err.errno != errno.ENOENT:
3205 result.append(serializer.LoadJson(data))
3210 def AbortImportExport(name):
3211 """Sends SIGTERM to a running import/export daemon.
3214 logging.info("Abort import/export %s", name)
3216 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3217 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3220 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3222 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3225 def CleanupImportExport(name):
3226 """Cleanup after an import or export.
3228 If the import/export daemon is still running it's killed. Afterwards the
3229 whole status directory is removed.
3232 logging.info("Finalizing import/export %s", name)
3234 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3236 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3239 logging.info("Import/export %s is still running with PID %s",
3241 utils.KillProcess(pid, waitpid=False)
3243 shutil.rmtree(status_dir, ignore_errors=True)
3246 def _FindDisks(nodes_ip, disks):
3247 """Sets the physical ID on disks and returns the block devices.
3250 # set the correct physical ID
3251 my_name = netutils.Hostname.GetSysName()
3253 cf.SetPhysicalID(my_name, nodes_ip)
3258 rd = _RecursiveFindBD(cf)
3260 _Fail("Can't find device %s", cf)
3265 def DrbdDisconnectNet(nodes_ip, disks):
3266 """Disconnects the network on a list of drbd devices.
3269 bdevs = _FindDisks(nodes_ip, disks)
3275 except errors.BlockDeviceError, err:
3276 _Fail("Can't change network configuration to standalone mode: %s",
3280 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3281 """Attaches the network on a list of drbd devices.
3284 bdevs = _FindDisks(nodes_ip, disks)
3287 for idx, rd in enumerate(bdevs):
3289 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3290 except EnvironmentError, err:
3291 _Fail("Can't create symlink: %s", err)
3292 # reconnect disks, switch to new master configuration and if
3293 # needed primary mode
3296 rd.AttachNet(multimaster)
3297 except errors.BlockDeviceError, err:
3298 _Fail("Can't change network configuration: %s", err)
3300 # wait until the disks are connected; we need to retry the re-attach
3301 # if the device becomes standalone, as this might happen if the one
3302 # node disconnects and reconnects in a different mode before the
3303 # other node reconnects; in this case, one or both of the nodes will
3304 # decide it has wrong configuration and switch to standalone
3307 all_connected = True
3310 stats = rd.GetProcStatus()
3312 all_connected = (all_connected and
3313 (stats.is_connected or stats.is_in_resync))
3315 if stats.is_standalone:
3316 # peer had different config info and this node became
3317 # standalone, even though this should not happen with the
3318 # new staged way of changing disk configs
3320 rd.AttachNet(multimaster)
3321 except errors.BlockDeviceError, err:
3322 _Fail("Can't change network configuration: %s", err)
3324 if not all_connected:
3325 raise utils.RetryAgain()
3328 # Start with a delay of 100 miliseconds and go up to 5 seconds
3329 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3330 except utils.RetryTimeout:
3331 _Fail("Timeout in disk reconnecting")
3334 # change to primary mode
3338 except errors.BlockDeviceError, err:
3339 _Fail("Can't change to primary mode: %s", err)
3342 def DrbdWaitSync(nodes_ip, disks):
3343 """Wait until DRBDs have synchronized.
3347 stats = rd.GetProcStatus()
3348 if not (stats.is_connected or stats.is_in_resync):
3349 raise utils.RetryAgain()
3352 bdevs = _FindDisks(nodes_ip, disks)
3358 # poll each second for 15 seconds
3359 stats = utils.Retry(_helper, 1, 15, args=[rd])
3360 except utils.RetryTimeout:
3361 stats = rd.GetProcStatus()
3363 if not (stats.is_connected or stats.is_in_resync):
3364 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3365 alldone = alldone and (not stats.is_in_resync)
3366 if stats.sync_percent is not None:
3367 min_resync = min(min_resync, stats.sync_percent)
3369 return (alldone, min_resync)
3372 def GetDrbdUsermodeHelper():
3373 """Returns DRBD usermode helper currently configured.
3377 return bdev.BaseDRBD.GetUsermodeHelper()
3378 except errors.BlockDeviceError, err:
3382 def PowercycleNode(hypervisor_type):
3383 """Hard-powercycle the node.
3385 Because we need to return first, and schedule the powercycle in the
3386 background, we won't be able to report failures nicely.
3389 hyper = hypervisor.GetHypervisor(hypervisor_type)
3393 # if we can't fork, we'll pretend that we're in the child process
3396 return "Reboot scheduled in 5 seconds"
3397 # ensure the child is running on ram
3400 except Exception: # pylint: disable=W0703
3403 hyper.PowercycleNode()
3406 class HooksRunner(object):
3409 This class is instantiated on the node side (ganeti-noded) and not
3413 def __init__(self, hooks_base_dir=None):
3414 """Constructor for hooks runner.
3416 @type hooks_base_dir: str or None
3417 @param hooks_base_dir: if not None, this overrides the
3418 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3421 if hooks_base_dir is None:
3422 hooks_base_dir = constants.HOOKS_BASE_DIR
3423 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3425 self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3427 def RunHooks(self, hpath, phase, env):
3428 """Run the scripts in the hooks directory.
3431 @param hpath: the path to the hooks directory which
3434 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3435 L{constants.HOOKS_PHASE_POST}
3437 @param env: dictionary with the environment for the hook
3439 @return: list of 3-element tuples:
3441 - script result, either L{constants.HKR_SUCCESS} or
3442 L{constants.HKR_FAIL}
3443 - output of the script
3445 @raise errors.ProgrammerError: for invalid input
3449 if phase == constants.HOOKS_PHASE_PRE:
3451 elif phase == constants.HOOKS_PHASE_POST:
3454 _Fail("Unknown hooks phase '%s'", phase)
3456 subdir = "%s-%s.d" % (hpath, suffix)
3457 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3461 if not os.path.isdir(dir_name):
3462 # for non-existing/non-dirs, we simply exit instead of logging a
3463 # warning at every operation
3466 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3468 for (relname, relstatus, runresult) in runparts_results:
3469 if relstatus == constants.RUNPARTS_SKIP:
3470 rrval = constants.HKR_SKIP
3472 elif relstatus == constants.RUNPARTS_ERR:
3473 rrval = constants.HKR_FAIL
3474 output = "Hook script execution error: %s" % runresult
3475 elif relstatus == constants.RUNPARTS_RUN:
3476 if runresult.failed:
3477 rrval = constants.HKR_FAIL
3479 rrval = constants.HKR_SUCCESS
3480 output = utils.SafeEncode(runresult.output.strip())
3481 results.append(("%s/%s" % (subdir, relname), rrval, output))
3486 class IAllocatorRunner(object):
3487 """IAllocator runner.
3489 This class is instantiated on the node side (ganeti-noded) and not on
3494 def Run(name, idata):
3495 """Run an iallocator script.
3498 @param name: the iallocator script name
3500 @param idata: the allocator input data
3503 @return: two element tuple of:
3505 - either error message or stdout of allocator (for success)
3508 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3510 if alloc_script is None:
3511 _Fail("iallocator module '%s' not found in the search path", name)
3513 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3517 result = utils.RunCmd([alloc_script, fin_name])
3519 _Fail("iallocator module '%s' failed: %s, output '%s'",
3520 name, result.fail_reason, result.output)
3524 return result.stdout
3527 class DevCacheManager(object):
3528 """Simple class for managing a cache of block device information.
3531 _DEV_PREFIX = "/dev/"
3532 _ROOT_DIR = constants.BDEV_CACHE_DIR
3535 def _ConvertPath(cls, dev_path):
3536 """Converts a /dev/name path to the cache file name.
3538 This replaces slashes with underscores and strips the /dev
3539 prefix. It then returns the full path to the cache file.
3542 @param dev_path: the C{/dev/} path name
3544 @return: the converted path name
3547 if dev_path.startswith(cls._DEV_PREFIX):
3548 dev_path = dev_path[len(cls._DEV_PREFIX):]
3549 dev_path = dev_path.replace("/", "_")
3550 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3554 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3555 """Updates the cache information for a given device.
3558 @param dev_path: the pathname of the device
3560 @param owner: the owner (instance name) of the device
3561 @type on_primary: bool
3562 @param on_primary: whether this is the primary
3565 @param iv_name: the instance-visible name of the
3566 device, as in objects.Disk.iv_name
3571 if dev_path is None:
3572 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3574 fpath = cls._ConvertPath(dev_path)
3580 iv_name = "not_visible"
3581 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3583 utils.WriteFile(fpath, data=fdata)
3584 except EnvironmentError, err:
3585 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3588 def RemoveCache(cls, dev_path):
3589 """Remove data for a dev_path.
3591 This is just a wrapper over L{utils.io.RemoveFile} with a converted
3592 path name and logging.
3595 @param dev_path: the pathname of the device
3600 if dev_path is None:
3601 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3603 fpath = cls._ConvertPath(dev_path)
3605 utils.RemoveFile(fpath)
3606 except EnvironmentError, err:
3607 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)