4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
68 constants.JOB_QUEUE_ARCHIVE_DIR,
70 constants.CRYPTO_KEYS_DIR,
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
80 class RPCFail(Exception):
81 """Class denoting RPC failure.
83 Its argument is the error message.
88 def _Fail(msg, *args, **kwargs):
89 """Log an error and the raise an RPCFail exception.
91 This exception is then handled specially in the ganeti daemon and
92 turned into a 'failed' return type. As such, this function is a
93 useful shortcut for logging the error and returning it to the master
97 @param msg: the text of the exception
103 if "log" not in kwargs or kwargs["log"]: # if we should log this error
104 if "exc" in kwargs and kwargs["exc"]:
105 logging.exception(msg)
112 """Simple wrapper to return a SimpleStore.
114 @rtype: L{ssconf.SimpleStore}
115 @return: a SimpleStore instance
118 return ssconf.SimpleStore()
121 def _GetSshRunner(cluster_name):
122 """Simple wrapper to return an SshRunner.
124 @type cluster_name: str
125 @param cluster_name: the cluster name, which is needed
126 by the SshRunner constructor
127 @rtype: L{ssh.SshRunner}
128 @return: an SshRunner instance
131 return ssh.SshRunner(cluster_name)
134 def _Decompress(data):
135 """Unpacks data compressed by the RPC client.
137 @type data: list or tuple
138 @param data: Data sent by RPC client
140 @return: Decompressed data
143 assert isinstance(data, (list, tuple))
144 assert len(data) == 2
145 (encoding, content) = data
146 if encoding == constants.RPC_ENCODING_NONE:
148 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149 return zlib.decompress(base64.b64decode(content))
151 raise AssertionError("Unknown data encoding")
154 def _CleanDirectory(path, exclude=None):
155 """Removes all regular files in a directory.
158 @param path: the directory to clean
160 @param exclude: list of files to be excluded, defaults
164 if path not in _ALLOWED_CLEAN_DIRS:
165 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
168 if not os.path.isdir(path):
173 # Normalize excluded paths
174 exclude = [os.path.normpath(i) for i in exclude]
176 for rel_name in utils.ListVisibleFiles(path):
177 full_name = utils.PathJoin(path, rel_name)
178 if full_name in exclude:
180 if os.path.isfile(full_name) and not os.path.islink(full_name):
181 utils.RemoveFile(full_name)
184 def _BuildUploadFileList():
185 """Build the list of allowed upload files.
187 This is abstracted so that it's built only once at module import time.
190 allowed_files = set([
191 constants.CLUSTER_CONF_FILE,
193 constants.SSH_KNOWN_HOSTS_FILE,
194 constants.VNC_PASSWORD_FILE,
195 constants.RAPI_CERT_FILE,
196 constants.RAPI_USERS_FILE,
197 constants.CONFD_HMAC_KEY,
198 constants.CLUSTER_DOMAIN_SECRET_FILE,
201 for hv_name in constants.HYPER_TYPES:
202 hv_class = hypervisor.GetHypervisorClass(hv_name)
203 allowed_files.update(hv_class.GetAncillaryFiles())
205 return frozenset(allowed_files)
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212 """Removes job queue files and archived jobs.
218 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223 """Returns master information.
225 This is an utility function to compute master information, either
226 for consumption here or from the node daemon.
229 @return: master_netdev, master_ip, master_name, primary_ip_family
230 @raise RPCFail: in case of errors
235 master_netdev = cfg.GetMasterNetdev()
236 master_ip = cfg.GetMasterIP()
237 master_node = cfg.GetMasterNode()
238 primary_ip_family = cfg.GetPrimaryIPFamily()
239 except errors.ConfigurationError, err:
240 _Fail("Cluster configuration incomplete: %s", err, exc=True)
241 return (master_netdev, master_ip, master_node, primary_ip_family)
244 def StartMaster(start_daemons, no_voting):
245 """Activate local node as master node.
247 The function will either try activate the IP address of the master
248 (unless someone else has it) or also start the master daemons, based
249 on the start_daemons parameter.
251 @type start_daemons: boolean
252 @param start_daemons: whether to start the master daemons
253 (ganeti-masterd and ganeti-rapi), or (if false) activate the
255 @type no_voting: boolean
256 @param no_voting: whether to start ganeti-masterd without a node vote
257 (if start_daemons is True), but still non-interactively
261 # GetMasterInfo will raise an exception if not able to return data
262 master_netdev, master_ip, _, family = GetMasterInfo()
265 # either start the master and rapi daemons
268 masterd_args = "--no-voting --yes-do-it"
273 "EXTRA_MASTERD_ARGS": masterd_args,
276 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
278 msg = "Can't start Ganeti master: %s" % result.output
283 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284 if netutils.IPAddress.Own(master_ip):
285 # we already have the ip:
286 logging.debug("Master IP already configured, doing nothing")
288 msg = "Someone else has the master ip, not activating"
292 ipcls = netutils.IP4Address
293 if family == netutils.IP6Address.family:
294 ipcls = netutils.IP6Address
296 result = utils.RunCmd(["ip", "address", "add",
297 "%s/%d" % (master_ip, ipcls.iplen),
298 "dev", master_netdev, "label",
299 "%s:0" % master_netdev])
301 msg = "Can't activate master IP: %s" % result.output
305 # we ignore the exit code of the following cmds
306 if ipcls == netutils.IP4Address:
307 utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308 master_ip, master_ip])
309 elif ipcls == netutils.IP6Address:
311 utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312 except errors.OpExecError:
313 # TODO: Better error reporting
314 logging.warning("Can't execute ndisc6, please install if missing")
317 _Fail("; ".join(err_msgs))
320 def StopMaster(stop_daemons):
321 """Deactivate this node as master.
323 The function will always try to deactivate the IP address of the
324 master. It will also stop the master daemons depending on the
325 stop_daemons parameter.
327 @type stop_daemons: boolean
328 @param stop_daemons: whether to also stop the master daemons
329 (ganeti-masterd and ganeti-rapi)
333 # TODO: log and report back to the caller the error failures; we
334 # need to decide in which case we fail the RPC for this
336 # GetMasterInfo will raise an exception if not able to return data
337 master_netdev, master_ip, _, family = GetMasterInfo()
339 ipcls = netutils.IP4Address
340 if family == netutils.IP6Address.family:
341 ipcls = netutils.IP6Address
343 result = utils.RunCmd(["ip", "address", "del",
344 "%s/%d" % (master_ip, ipcls.iplen),
345 "dev", master_netdev])
347 logging.error("Can't remove the master IP, error: %s", result.output)
348 # but otherwise ignore the failure
351 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
353 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
355 result.cmd, result.exit_code, result.output)
358 def EtcHostsModify(mode, host, ip):
359 """Modify a host entry in /etc/hosts.
361 @param mode: The mode to operate. Either add or remove entry
362 @param host: The host to operate on
363 @param ip: The ip associated with the entry
366 if mode == constants.ETC_HOSTS_ADD:
368 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
370 utils.AddHostToEtcHosts(host, ip)
371 elif mode == constants.ETC_HOSTS_REMOVE:
373 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374 " parameter is present")
375 utils.RemoveHostFromEtcHosts(host)
377 RPCFail("Mode not supported")
380 def LeaveCluster(modify_ssh_setup):
381 """Cleans up and remove the current node.
383 This function cleans up and prepares the current node to be removed
386 If processing is successful, then it raises an
387 L{errors.QuitGanetiException} which is used as a special case to
388 shutdown the node daemon.
390 @param modify_ssh_setup: boolean
393 _CleanDirectory(constants.DATA_DIR)
394 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
399 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
401 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
403 utils.RemoveFile(priv_key)
404 utils.RemoveFile(pub_key)
405 except errors.OpExecError:
406 logging.exception("Error while processing ssh files")
409 utils.RemoveFile(constants.CONFD_HMAC_KEY)
410 utils.RemoveFile(constants.RAPI_CERT_FILE)
411 utils.RemoveFile(constants.NODED_CERT_FILE)
412 except: # pylint: disable-msg=W0702
413 logging.exception("Error while removing cluster secrets")
415 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
417 logging.error("Command %s failed with exitcode %s and error %s",
418 result.cmd, result.exit_code, result.output)
420 # Raise a custom exception (handled in ganeti-noded)
421 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
424 def GetNodeInfo(vgname, hypervisor_type):
425 """Gives back a hash with different information about the node.
427 @type vgname: C{string}
428 @param vgname: the name of the volume group to ask for disk space information
429 @type hypervisor_type: C{str}
430 @param hypervisor_type: the name of the hypervisor to ask for
433 @return: dictionary with the following keys:
434 - vg_size is the size of the configured volume group in MiB
435 - vg_free is the free size of the volume group in MiB
436 - memory_dom0 is the memory allocated for domain0 in MiB
437 - memory_free is the currently available (free) ram in MiB
438 - memory_total is the total number of ram in MiB
443 vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444 vg_free = vg_size = None
446 vg_free = int(round(vginfo[0][0], 0))
447 vg_size = int(round(vginfo[0][1], 0))
449 outputarray['vg_size'] = vg_size
450 outputarray['vg_free'] = vg_free
452 hyper = hypervisor.GetHypervisor(hypervisor_type)
453 hyp_info = hyper.GetNodeInfo()
454 if hyp_info is not None:
455 outputarray.update(hyp_info)
457 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462 def VerifyNode(what, cluster_name):
463 """Verify the status of the local node.
465 Based on the input L{what} parameter, various checks are done on the
468 If the I{filelist} key is present, this list of
469 files is checksummed and the file/checksum pairs are returned.
471 If the I{nodelist} key is present, we check that we have
472 connectivity via ssh with the target nodes (and check the hostname
475 If the I{node-net-test} key is present, we check that we have
476 connectivity to the given nodes via both primary IP and, if
477 applicable, secondary IPs.
480 @param what: a dictionary of things to check:
481 - filelist: list of files for which to compute checksums
482 - nodelist: list of nodes we should check ssh communication with
483 - node-net-test: list of nodes we should check node daemon port
485 - hypervisor: list with hypervisors to run the verify for
487 @return: a dictionary with the same keys as the input dict, and
488 values representing the result of the checks
492 my_name = netutils.Hostname.GetSysName()
493 port = netutils.GetDaemonPort(constants.NODED)
495 if constants.NV_HYPERVISOR in what:
496 result[constants.NV_HYPERVISOR] = tmp = {}
497 for hv_name in what[constants.NV_HYPERVISOR]:
499 val = hypervisor.GetHypervisor(hv_name).Verify()
500 except errors.HypervisorError, err:
501 val = "Error while checking hypervisor: %s" % str(err)
504 if constants.NV_FILELIST in what:
505 result[constants.NV_FILELIST] = utils.FingerprintFiles(
506 what[constants.NV_FILELIST])
508 if constants.NV_NODELIST in what:
509 result[constants.NV_NODELIST] = tmp = {}
510 random.shuffle(what[constants.NV_NODELIST])
511 for node in what[constants.NV_NODELIST]:
512 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
516 if constants.NV_NODENETTEST in what:
517 result[constants.NV_NODENETTEST] = tmp = {}
518 my_pip = my_sip = None
519 for name, pip, sip in what[constants.NV_NODENETTEST]:
525 tmp[my_name] = ("Can't find my own primary/secondary IP"
528 for name, pip, sip in what[constants.NV_NODENETTEST]:
530 if not netutils.TcpPing(pip, port, source=my_pip):
531 fail.append("primary")
533 if not netutils.TcpPing(sip, port, source=my_sip):
534 fail.append("secondary")
536 tmp[name] = ("failure using the %s interface(s)" %
539 if constants.NV_MASTERIP in what:
540 # FIXME: add checks on incoming data structures (here and in the
541 # rest of the function)
542 master_name, master_ip = what[constants.NV_MASTERIP]
543 if master_name == my_name:
544 source = constants.IP4_ADDRESS_LOCALHOST
547 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
550 if constants.NV_LVLIST in what:
552 val = GetVolumeList(what[constants.NV_LVLIST])
555 result[constants.NV_LVLIST] = val
557 if constants.NV_INSTANCELIST in what:
558 # GetInstanceList can fail
560 val = GetInstanceList(what[constants.NV_INSTANCELIST])
563 result[constants.NV_INSTANCELIST] = val
565 if constants.NV_VGLIST in what:
566 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
568 if constants.NV_PVLIST in what:
569 result[constants.NV_PVLIST] = \
570 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
571 filter_allocatable=False)
573 if constants.NV_VERSION in what:
574 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
575 constants.RELEASE_VERSION)
577 if constants.NV_HVINFO in what:
578 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
579 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
581 if constants.NV_DRBDLIST in what:
583 used_minors = bdev.DRBD8.GetUsedDevs().keys()
584 except errors.BlockDeviceError, err:
585 logging.warning("Can't get used minors list", exc_info=True)
586 used_minors = str(err)
587 result[constants.NV_DRBDLIST] = used_minors
589 if constants.NV_DRBDHELPER in what:
592 payload = bdev.BaseDRBD.GetUsermodeHelper()
593 except errors.BlockDeviceError, err:
594 logging.error("Can't get DRBD usermode helper: %s", str(err))
597 result[constants.NV_DRBDHELPER] = (status, payload)
599 if constants.NV_NODESETUP in what:
600 result[constants.NV_NODESETUP] = tmpr = []
601 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
602 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
603 " under /sys, missing required directories /sys/block"
604 " and /sys/class/net")
605 if (not os.path.isdir("/proc/sys") or
606 not os.path.isfile("/proc/sysrq-trigger")):
607 tmpr.append("The procfs filesystem doesn't seem to be mounted"
608 " under /proc, missing required directory /proc/sys and"
609 " the file /proc/sysrq-trigger")
611 if constants.NV_TIME in what:
612 result[constants.NV_TIME] = utils.SplitTime(time.time())
614 if constants.NV_OSLIST in what:
615 result[constants.NV_OSLIST] = DiagnoseOS()
620 def GetVolumeList(vg_name):
621 """Compute list of logical volumes and their size.
624 @param vg_name: the volume group whose LVs we should list
627 dictionary of all partions (key) with value being a tuple of
628 their size (in MiB), inactive and online status::
630 {'test1': ('20.06', True, True)}
632 in case of errors, a string is returned with the error
638 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
639 "--separator=%s" % sep,
640 "-olv_name,lv_size,lv_attr", vg_name])
642 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
644 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
645 for line in result.stdout.splitlines():
647 match = valid_line_re.match(line)
649 logging.error("Invalid line returned from lvs output: '%s'", line)
651 name, size, attr = match.groups()
652 inactive = attr[4] == '-'
653 online = attr[5] == 'o'
654 virtual = attr[0] == 'v'
656 # we don't want to report such volumes as existing, since they
657 # don't really hold data
659 lvs[name] = (size, inactive, online)
664 def ListVolumeGroups():
665 """List the volume groups and their size.
668 @return: dictionary with keys volume name and values the
672 return utils.ListVolumeGroups()
676 """List all volumes on this node.
680 A list of dictionaries, each having four keys:
681 - name: the logical volume name,
682 - size: the size of the logical volume
683 - dev: the physical device on which the LV lives
684 - vg: the volume group to which it belongs
686 In case of errors, we return an empty list and log the
689 Note that since a logical volume can live on multiple physical
690 volumes, the resulting list might include a logical volume
694 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
696 "--options=lv_name,lv_size,devices,vg_name"])
698 _Fail("Failed to list logical volumes, lvs output: %s",
702 return dev.split('(')[0]
705 return [parse_dev(x) for x in dev.split(",")]
708 line = [v.strip() for v in line]
709 return [{'name': line[0], 'size': line[1],
710 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
713 for line in result.stdout.splitlines():
714 if line.count('|') >= 3:
715 all_devs.extend(map_line(line.split('|')))
717 logging.warning("Strange line in the output from lvs: '%s'", line)
721 def BridgesExist(bridges_list):
722 """Check if a list of bridges exist on the current node.
725 @return: C{True} if all of them exist, C{False} otherwise
729 for bridge in bridges_list:
730 if not utils.BridgeExists(bridge):
731 missing.append(bridge)
734 _Fail("Missing bridges %s", utils.CommaJoin(missing))
737 def GetInstanceList(hypervisor_list):
738 """Provides a list of instances.
740 @type hypervisor_list: list
741 @param hypervisor_list: the list of hypervisors to query information
744 @return: a list of all running instances on the current node
745 - instance1.example.com
746 - instance2.example.com
750 for hname in hypervisor_list:
752 names = hypervisor.GetHypervisor(hname).ListInstances()
753 results.extend(names)
754 except errors.HypervisorError, err:
755 _Fail("Error enumerating instances (hypervisor %s): %s",
756 hname, err, exc=True)
761 def GetInstanceInfo(instance, hname):
762 """Gives back the information about an instance as a dictionary.
764 @type instance: string
765 @param instance: the instance name
767 @param hname: the hypervisor type of the instance
770 @return: dictionary with the following keys:
771 - memory: memory size of instance (int)
772 - state: xen state of instance (string)
773 - time: cpu time of instance (float)
778 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
779 if iinfo is not None:
780 output['memory'] = iinfo[2]
781 output['state'] = iinfo[4]
782 output['time'] = iinfo[5]
787 def GetInstanceMigratable(instance):
788 """Gives whether an instance can be migrated.
790 @type instance: L{objects.Instance}
791 @param instance: object representing the instance to be checked.
794 @return: tuple of (result, description) where:
795 - result: whether the instance can be migrated or not
796 - description: a description of the issue, if relevant
799 hyper = hypervisor.GetHypervisor(instance.hypervisor)
800 iname = instance.name
801 if iname not in hyper.ListInstances():
802 _Fail("Instance %s is not running", iname)
804 for idx in range(len(instance.disks)):
805 link_name = _GetBlockDevSymlinkPath(iname, idx)
806 if not os.path.islink(link_name):
807 logging.warning("Instance %s is missing symlink %s for disk %d",
808 iname, link_name, idx)
811 def GetAllInstancesInfo(hypervisor_list):
812 """Gather data about all instances.
814 This is the equivalent of L{GetInstanceInfo}, except that it
815 computes data for all instances at once, thus being faster if one
816 needs data about more than one instance.
818 @type hypervisor_list: list
819 @param hypervisor_list: list of hypervisors to query for instance data
822 @return: dictionary of instance: data, with data having the following keys:
823 - memory: memory size of instance (int)
824 - state: xen state of instance (string)
825 - time: cpu time of instance (float)
826 - vcpus: the number of vcpus
831 for hname in hypervisor_list:
832 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
834 for name, _, memory, vcpus, state, times in iinfo:
842 # we only check static parameters, like memory and vcpus,
843 # and not state and time which can change between the
844 # invocations of the different hypervisors
845 for key in 'memory', 'vcpus':
846 if value[key] != output[name][key]:
847 _Fail("Instance %s is running twice"
848 " with different parameters", name)
854 def _InstanceLogName(kind, os_name, instance):
855 """Compute the OS log filename for a given instance and operation.
857 The instance name and os name are passed in as strings since not all
858 operations have these as part of an instance object.
861 @param kind: the operation type (e.g. add, import, etc.)
862 @type os_name: string
863 @param os_name: the os name
864 @type instance: string
865 @param instance: the name of the instance being imported/added/etc.
868 # TODO: Use tempfile.mkstemp to create unique filename
869 base = ("%s-%s-%s-%s.log" %
870 (kind, os_name, instance, utils.TimestampForFilename()))
871 return utils.PathJoin(constants.LOG_OS_DIR, base)
874 def InstanceOsAdd(instance, reinstall, debug):
875 """Add an OS to an instance.
877 @type instance: L{objects.Instance}
878 @param instance: Instance whose OS is to be installed
879 @type reinstall: boolean
880 @param reinstall: whether this is an instance reinstall
882 @param debug: debug level, passed to the OS scripts
886 inst_os = OSFromDisk(instance.os)
888 create_env = OSEnvironment(instance, inst_os, debug)
890 create_env['INSTANCE_REINSTALL'] = "1"
892 logfile = _InstanceLogName("add", instance.os, instance.name)
894 result = utils.RunCmd([inst_os.create_script], env=create_env,
895 cwd=inst_os.path, output=logfile,)
897 logging.error("os create command '%s' returned error: %s, logfile: %s,"
898 " output: %s", result.cmd, result.fail_reason, logfile,
900 lines = [utils.SafeEncode(val)
901 for val in utils.TailFile(logfile, lines=20)]
902 _Fail("OS create script failed (%s), last lines in the"
903 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
906 def RunRenameInstance(instance, old_name, debug):
907 """Run the OS rename script for an instance.
909 @type instance: L{objects.Instance}
910 @param instance: Instance whose OS is to be installed
911 @type old_name: string
912 @param old_name: previous instance name
914 @param debug: debug level, passed to the OS scripts
916 @return: the success of the operation
919 inst_os = OSFromDisk(instance.os)
921 rename_env = OSEnvironment(instance, inst_os, debug)
922 rename_env['OLD_INSTANCE_NAME'] = old_name
924 logfile = _InstanceLogName("rename", instance.os,
925 "%s-%s" % (old_name, instance.name))
927 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928 cwd=inst_os.path, output=logfile)
931 logging.error("os create command '%s' returned error: %s output: %s",
932 result.cmd, result.fail_reason, result.output)
933 lines = [utils.SafeEncode(val)
934 for val in utils.TailFile(logfile, lines=20)]
935 _Fail("OS rename script failed (%s), last lines in the"
936 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
939 def _GetBlockDevSymlinkPath(instance_name, idx):
940 return utils.PathJoin(constants.DISK_LINKS_DIR,
941 "%s:%d" % (instance_name, idx))
944 def _SymlinkBlockDev(instance_name, device_path, idx):
945 """Set up symlinks to a instance's block device.
947 This is an auxiliary function run when an instance is start (on the primary
948 node) or when an instance is migrated (on the target node).
951 @param instance_name: the name of the target instance
952 @param device_path: path of the physical block device, on the node
953 @param idx: the disk index
954 @return: absolute path to the disk's symlink
957 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
959 os.symlink(device_path, link_name)
961 if err.errno == errno.EEXIST:
962 if (not os.path.islink(link_name) or
963 os.readlink(link_name) != device_path):
965 os.symlink(device_path, link_name)
972 def _RemoveBlockDevLinks(instance_name, disks):
973 """Remove the block device symlinks belonging to the given instance.
976 for idx, _ in enumerate(disks):
977 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978 if os.path.islink(link_name):
982 logging.exception("Can't remove symlink '%s'", link_name)
985 def _GatherAndLinkBlockDevs(instance):
986 """Set up an instance's block device(s).
988 This is run on the primary node at instance startup. The block
989 devices must be already assembled.
991 @type instance: L{objects.Instance}
992 @param instance: the instance whose disks we shoul assemble
994 @return: list of (disk_object, device_path)
998 for idx, disk in enumerate(instance.disks):
999 device = _RecursiveFindBD(disk)
1001 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1005 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1007 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1010 block_devices.append((disk, link_name))
1012 return block_devices
1015 def StartInstance(instance):
1016 """Start an instance.
1018 @type instance: L{objects.Instance}
1019 @param instance: the instance object
1023 running_instances = GetInstanceList([instance.hypervisor])
1025 if instance.name in running_instances:
1026 logging.info("Instance %s already running, not starting", instance.name)
1030 block_devices = _GatherAndLinkBlockDevs(instance)
1031 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1032 hyper.StartInstance(instance, block_devices)
1033 except errors.BlockDeviceError, err:
1034 _Fail("Block device error: %s", err, exc=True)
1035 except errors.HypervisorError, err:
1036 _RemoveBlockDevLinks(instance.name, instance.disks)
1037 _Fail("Hypervisor error: %s", err, exc=True)
1040 def InstanceShutdown(instance, timeout):
1041 """Shut an instance down.
1043 @note: this functions uses polling with a hardcoded timeout.
1045 @type instance: L{objects.Instance}
1046 @param instance: the instance object
1047 @type timeout: integer
1048 @param timeout: maximum timeout for soft shutdown
1052 hv_name = instance.hypervisor
1053 hyper = hypervisor.GetHypervisor(hv_name)
1054 iname = instance.name
1056 if instance.name not in hyper.ListInstances():
1057 logging.info("Instance %s not running, doing nothing", iname)
1062 self.tried_once = False
1065 if iname not in hyper.ListInstances():
1069 hyper.StopInstance(instance, retry=self.tried_once)
1070 except errors.HypervisorError, err:
1071 if iname not in hyper.ListInstances():
1072 # if the instance is no longer existing, consider this a
1073 # success and go to cleanup
1076 _Fail("Failed to stop instance %s: %s", iname, err)
1078 self.tried_once = True
1080 raise utils.RetryAgain()
1083 utils.Retry(_TryShutdown(), 5, timeout)
1084 except utils.RetryTimeout:
1085 # the shutdown did not succeed
1086 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1089 hyper.StopInstance(instance, force=True)
1090 except errors.HypervisorError, err:
1091 if iname in hyper.ListInstances():
1092 # only raise an error if the instance still exists, otherwise
1093 # the error could simply be "instance ... unknown"!
1094 _Fail("Failed to force stop instance %s: %s", iname, err)
1098 if iname in hyper.ListInstances():
1099 _Fail("Could not shutdown instance %s even by destroy", iname)
1102 hyper.CleanupInstance(instance.name)
1103 except errors.HypervisorError, err:
1104 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1106 _RemoveBlockDevLinks(iname, instance.disks)
1109 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1110 """Reboot an instance.
1112 @type instance: L{objects.Instance}
1113 @param instance: the instance object to reboot
1114 @type reboot_type: str
1115 @param reboot_type: the type of reboot, one the following
1117 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1118 instance OS, do not recreate the VM
1119 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1120 restart the VM (at the hypervisor level)
1121 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1122 not accepted here, since that mode is handled differently, in
1123 cmdlib, and translates into full stop and start of the
1124 instance (instead of a call_instance_reboot RPC)
1125 @type shutdown_timeout: integer
1126 @param shutdown_timeout: maximum timeout for soft shutdown
1130 running_instances = GetInstanceList([instance.hypervisor])
1132 if instance.name not in running_instances:
1133 _Fail("Cannot reboot instance %s that is not running", instance.name)
1135 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1136 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1138 hyper.RebootInstance(instance)
1139 except errors.HypervisorError, err:
1140 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1141 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1143 InstanceShutdown(instance, shutdown_timeout)
1144 return StartInstance(instance)
1145 except errors.HypervisorError, err:
1146 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1148 _Fail("Invalid reboot_type received: %s", reboot_type)
1151 def MigrationInfo(instance):
1152 """Gather information about an instance to be migrated.
1154 @type instance: L{objects.Instance}
1155 @param instance: the instance definition
1158 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160 info = hyper.MigrationInfo(instance)
1161 except errors.HypervisorError, err:
1162 _Fail("Failed to fetch migration information: %s", err, exc=True)
1166 def AcceptInstance(instance, info, target):
1167 """Prepare the node to accept an instance.
1169 @type instance: L{objects.Instance}
1170 @param instance: the instance definition
1171 @type info: string/data (opaque)
1172 @param info: migration information, from the source node
1173 @type target: string
1174 @param target: target host (usually ip), on this node
1177 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179 hyper.AcceptInstance(instance, info, target)
1180 except errors.HypervisorError, err:
1181 _Fail("Failed to accept instance: %s", err, exc=True)
1184 def FinalizeMigration(instance, info, success):
1185 """Finalize any preparation to accept an instance.
1187 @type instance: L{objects.Instance}
1188 @param instance: the instance definition
1189 @type info: string/data (opaque)
1190 @param info: migration information, from the source node
1191 @type success: boolean
1192 @param success: whether the migration was a success or a failure
1195 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197 hyper.FinalizeMigration(instance, info, success)
1198 except errors.HypervisorError, err:
1199 _Fail("Failed to finalize migration: %s", err, exc=True)
1202 def MigrateInstance(instance, target, live):
1203 """Migrates an instance to another node.
1205 @type instance: L{objects.Instance}
1206 @param instance: the instance definition
1207 @type target: string
1208 @param target: the target node name
1210 @param live: whether the migration should be done live or not (the
1211 interpretation of this parameter is left to the hypervisor)
1213 @return: a tuple of (success, msg) where:
1214 - succes is a boolean denoting the success/failure of the operation
1215 - msg is a string with details in case of failure
1218 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1221 hyper.MigrateInstance(instance, target, live)
1222 except errors.HypervisorError, err:
1223 _Fail("Failed to migrate instance: %s", err, exc=True)
1226 def BlockdevCreate(disk, size, owner, on_primary, info):
1227 """Creates a block device for an instance.
1229 @type disk: L{objects.Disk}
1230 @param disk: the object describing the disk we should create
1232 @param size: the size of the physical underlying device, in MiB
1234 @param owner: the name of the instance for which disk is created,
1235 used for device cache data
1236 @type on_primary: boolean
1237 @param on_primary: indicates if it is the primary node or not
1239 @param info: string that will be sent to the physical device
1240 creation, used for example to set (LVM) tags on LVs
1242 @return: the new unique_id of the device (this can sometime be
1243 computed only after creation), or None. On secondary nodes,
1244 it's not required to return anything.
1247 # TODO: remove the obsolete 'size' argument
1248 # pylint: disable-msg=W0613
1251 for child in disk.children:
1253 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1254 except errors.BlockDeviceError, err:
1255 _Fail("Can't assemble device %s: %s", child, err)
1256 if on_primary or disk.AssembleOnSecondary():
1257 # we need the children open in case the device itself has to
1260 # pylint: disable-msg=E1103
1262 except errors.BlockDeviceError, err:
1263 _Fail("Can't make child '%s' read-write: %s", child, err)
1267 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1268 except errors.BlockDeviceError, err:
1269 _Fail("Can't create block device: %s", err)
1271 if on_primary or disk.AssembleOnSecondary():
1274 except errors.BlockDeviceError, err:
1275 _Fail("Can't assemble device after creation, unusual event: %s", err)
1276 device.SetSyncSpeed(constants.SYNC_SPEED)
1277 if on_primary or disk.OpenOnSecondary():
1279 device.Open(force=True)
1280 except errors.BlockDeviceError, err:
1281 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1282 DevCacheManager.UpdateCache(device.dev_path, owner,
1283 on_primary, disk.iv_name)
1285 device.SetInfo(info)
1287 return device.unique_id
1290 def BlockdevRemove(disk):
1291 """Remove a block device.
1293 @note: This is intended to be called recursively.
1295 @type disk: L{objects.Disk}
1296 @param disk: the disk object we should remove
1298 @return: the success of the operation
1303 rdev = _RecursiveFindBD(disk)
1304 except errors.BlockDeviceError, err:
1305 # probably can't attach
1306 logging.info("Can't attach to device %s in remove", disk)
1308 if rdev is not None:
1309 r_path = rdev.dev_path
1312 except errors.BlockDeviceError, err:
1313 msgs.append(str(err))
1315 DevCacheManager.RemoveCache(r_path)
1318 for child in disk.children:
1320 BlockdevRemove(child)
1321 except RPCFail, err:
1322 msgs.append(str(err))
1325 _Fail("; ".join(msgs))
1328 def _RecursiveAssembleBD(disk, owner, as_primary):
1329 """Activate a block device for an instance.
1331 This is run on the primary and secondary nodes for an instance.
1333 @note: this function is called recursively.
1335 @type disk: L{objects.Disk}
1336 @param disk: the disk we try to assemble
1338 @param owner: the name of the instance which owns the disk
1339 @type as_primary: boolean
1340 @param as_primary: if we should make the block device
1343 @return: the assembled device or None (in case no device
1345 @raise errors.BlockDeviceError: in case there is an error
1346 during the activation of the children or the device
1352 mcn = disk.ChildrenNeeded()
1354 mcn = 0 # max number of Nones allowed
1356 mcn = len(disk.children) - mcn # max number of Nones
1357 for chld_disk in disk.children:
1359 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1360 except errors.BlockDeviceError, err:
1361 if children.count(None) >= mcn:
1364 logging.error("Error in child activation (but continuing): %s",
1366 children.append(cdev)
1368 if as_primary or disk.AssembleOnSecondary():
1369 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1370 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1372 if as_primary or disk.OpenOnSecondary():
1374 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1375 as_primary, disk.iv_name)
1382 def BlockdevAssemble(disk, owner, as_primary):
1383 """Activate a block device for an instance.
1385 This is a wrapper over _RecursiveAssembleBD.
1387 @rtype: str or boolean
1388 @return: a C{/dev/...} path for primary nodes, and
1389 C{True} for secondary nodes
1393 result = _RecursiveAssembleBD(disk, owner, as_primary)
1394 if isinstance(result, bdev.BlockDev):
1395 # pylint: disable-msg=E1103
1396 result = result.dev_path
1397 except errors.BlockDeviceError, err:
1398 _Fail("Error while assembling disk: %s", err, exc=True)
1403 def BlockdevShutdown(disk):
1404 """Shut down a block device.
1406 First, if the device is assembled (Attach() is successful), then
1407 the device is shutdown. Then the children of the device are
1410 This function is called recursively. Note that we don't cache the
1411 children or such, as oppossed to assemble, shutdown of different
1412 devices doesn't require that the upper device was active.
1414 @type disk: L{objects.Disk}
1415 @param disk: the description of the disk we should
1421 r_dev = _RecursiveFindBD(disk)
1422 if r_dev is not None:
1423 r_path = r_dev.dev_path
1426 DevCacheManager.RemoveCache(r_path)
1427 except errors.BlockDeviceError, err:
1428 msgs.append(str(err))
1431 for child in disk.children:
1433 BlockdevShutdown(child)
1434 except RPCFail, err:
1435 msgs.append(str(err))
1438 _Fail("; ".join(msgs))
1441 def BlockdevAddchildren(parent_cdev, new_cdevs):
1442 """Extend a mirrored block device.
1444 @type parent_cdev: L{objects.Disk}
1445 @param parent_cdev: the disk to which we should add children
1446 @type new_cdevs: list of L{objects.Disk}
1447 @param new_cdevs: the list of children which we should add
1451 parent_bdev = _RecursiveFindBD(parent_cdev)
1452 if parent_bdev is None:
1453 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1454 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1455 if new_bdevs.count(None) > 0:
1456 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1457 parent_bdev.AddChildren(new_bdevs)
1460 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1461 """Shrink a mirrored block device.
1463 @type parent_cdev: L{objects.Disk}
1464 @param parent_cdev: the disk from which we should remove children
1465 @type new_cdevs: list of L{objects.Disk}
1466 @param new_cdevs: the list of children which we should remove
1470 parent_bdev = _RecursiveFindBD(parent_cdev)
1471 if parent_bdev is None:
1472 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1474 for disk in new_cdevs:
1475 rpath = disk.StaticDevPath()
1477 bd = _RecursiveFindBD(disk)
1479 _Fail("Can't find device %s while removing children", disk)
1481 devs.append(bd.dev_path)
1483 if not utils.IsNormAbsPath(rpath):
1484 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1486 parent_bdev.RemoveChildren(devs)
1489 def BlockdevGetmirrorstatus(disks):
1490 """Get the mirroring status of a list of devices.
1492 @type disks: list of L{objects.Disk}
1493 @param disks: the list of disks which we should query
1496 a list of (mirror_done, estimated_time) tuples, which
1497 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1498 @raise errors.BlockDeviceError: if any of the disks cannot be
1504 rbd = _RecursiveFindBD(dsk)
1506 _Fail("Can't find device %s", dsk)
1508 stats.append(rbd.CombinedSyncStatus())
1513 def _RecursiveFindBD(disk):
1514 """Check if a device is activated.
1516 If so, return information about the real device.
1518 @type disk: L{objects.Disk}
1519 @param disk: the disk object we need to find
1521 @return: None if the device can't be found,
1522 otherwise the device instance
1527 for chdisk in disk.children:
1528 children.append(_RecursiveFindBD(chdisk))
1530 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1533 def _OpenRealBD(disk):
1534 """Opens the underlying block device of a disk.
1536 @type disk: L{objects.Disk}
1537 @param disk: the disk object we want to open
1540 real_disk = _RecursiveFindBD(disk)
1541 if real_disk is None:
1542 _Fail("Block device '%s' is not set up", disk)
1549 def BlockdevFind(disk):
1550 """Check if a device is activated.
1552 If it is, return information about the real device.
1554 @type disk: L{objects.Disk}
1555 @param disk: the disk to find
1556 @rtype: None or objects.BlockDevStatus
1557 @return: None if the disk cannot be found, otherwise a the current
1562 rbd = _RecursiveFindBD(disk)
1563 except errors.BlockDeviceError, err:
1564 _Fail("Failed to find device: %s", err, exc=True)
1569 return rbd.GetSyncStatus()
1572 def BlockdevGetsize(disks):
1573 """Computes the size of the given disks.
1575 If a disk is not found, returns None instead.
1577 @type disks: list of L{objects.Disk}
1578 @param disks: the list of disk to compute the size for
1580 @return: list with elements None if the disk cannot be found,
1587 rbd = _RecursiveFindBD(cf)
1588 except errors.BlockDeviceError:
1594 result.append(rbd.GetActualSize())
1598 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1599 """Export a block device to a remote node.
1601 @type disk: L{objects.Disk}
1602 @param disk: the description of the disk to export
1603 @type dest_node: str
1604 @param dest_node: the destination node to export to
1605 @type dest_path: str
1606 @param dest_path: the destination path on the target node
1607 @type cluster_name: str
1608 @param cluster_name: the cluster name, needed for SSH hostalias
1612 real_disk = _OpenRealBD(disk)
1614 # the block size on the read dd is 1MiB to match our units
1615 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1616 "dd if=%s bs=1048576 count=%s",
1617 real_disk.dev_path, str(disk.size))
1619 # we set here a smaller block size as, due to ssh buffering, more
1620 # than 64-128k will mostly ignored; we use nocreat to fail if the
1621 # device is not already there or we pass a wrong path; we use
1622 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1623 # to not buffer too much memory; this means that at best, we flush
1624 # every 64k, which will not be very fast
1625 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1626 " oflag=dsync", dest_path)
1628 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1629 constants.GANETI_RUNAS,
1632 # all commands have been checked, so we're safe to combine them
1633 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1635 result = utils.RunCmd(["bash", "-c", command])
1638 _Fail("Disk copy command '%s' returned error: %s"
1639 " output: %s", command, result.fail_reason, result.output)
1642 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1643 """Write a file to the filesystem.
1645 This allows the master to overwrite(!) a file. It will only perform
1646 the operation if the file belongs to a list of configuration files.
1648 @type file_name: str
1649 @param file_name: the target file name
1651 @param data: the new contents of the file
1653 @param mode: the mode to give the file (can be None)
1655 @param uid: the owner of the file (can be -1 for default)
1657 @param gid: the group of the file (can be -1 for default)
1659 @param atime: the atime to set on the file (can be None)
1661 @param mtime: the mtime to set on the file (can be None)
1665 if not os.path.isabs(file_name):
1666 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1668 if file_name not in _ALLOWED_UPLOAD_FILES:
1669 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1672 raw_data = _Decompress(data)
1674 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1675 atime=atime, mtime=mtime)
1678 def WriteSsconfFiles(values):
1679 """Update all ssconf files.
1681 Wrapper around the SimpleStore.WriteFiles.
1684 ssconf.SimpleStore().WriteFiles(values)
1687 def _ErrnoOrStr(err):
1688 """Format an EnvironmentError exception.
1690 If the L{err} argument has an errno attribute, it will be looked up
1691 and converted into a textual C{E...} description. Otherwise the
1692 string representation of the error will be returned.
1694 @type err: L{EnvironmentError}
1695 @param err: the exception to format
1698 if hasattr(err, 'errno'):
1699 detail = errno.errorcode[err.errno]
1705 def _OSOndiskAPIVersion(os_dir):
1706 """Compute and return the API version of a given OS.
1708 This function will try to read the API version of the OS residing in
1709 the 'os_dir' directory.
1712 @param os_dir: the directory in which we should look for the OS
1714 @return: tuple (status, data) with status denoting the validity and
1715 data holding either the vaid versions or an error message
1718 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1721 st = os.stat(api_file)
1722 except EnvironmentError, err:
1723 return False, ("Required file '%s' not found under path %s: %s" %
1724 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1726 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1727 return False, ("File '%s' in %s is not a regular file" %
1728 (constants.OS_API_FILE, os_dir))
1731 api_versions = utils.ReadFile(api_file).splitlines()
1732 except EnvironmentError, err:
1733 return False, ("Error while reading the API version file at %s: %s" %
1734 (api_file, _ErrnoOrStr(err)))
1737 api_versions = [int(version.strip()) for version in api_versions]
1738 except (TypeError, ValueError), err:
1739 return False, ("API version(s) can't be converted to integer: %s" %
1742 return True, api_versions
1745 def DiagnoseOS(top_dirs=None):
1746 """Compute the validity for all OSes.
1748 @type top_dirs: list
1749 @param top_dirs: the list of directories in which to
1750 search (if not given defaults to
1751 L{constants.OS_SEARCH_PATH})
1752 @rtype: list of L{objects.OS}
1753 @return: a list of tuples (name, path, status, diagnose, variants,
1754 parameters, api_version) for all (potential) OSes under all
1755 search paths, where:
1756 - name is the (potential) OS name
1757 - path is the full path to the OS
1758 - status True/False is the validity of the OS
1759 - diagnose is the error message for an invalid OS, otherwise empty
1760 - variants is a list of supported OS variants, if any
1761 - parameters is a list of (name, help) parameters, if any
1762 - api_version is a list of support OS API versions
1765 if top_dirs is None:
1766 top_dirs = constants.OS_SEARCH_PATH
1769 for dir_name in top_dirs:
1770 if os.path.isdir(dir_name):
1772 f_names = utils.ListVisibleFiles(dir_name)
1773 except EnvironmentError, err:
1774 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1776 for name in f_names:
1777 os_path = utils.PathJoin(dir_name, name)
1778 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1781 variants = os_inst.supported_variants
1782 parameters = os_inst.supported_parameters
1783 api_versions = os_inst.api_versions
1786 variants = parameters = api_versions = []
1787 result.append((name, os_path, status, diagnose, variants,
1788 parameters, api_versions))
1793 def _TryOSFromDisk(name, base_dir=None):
1794 """Create an OS instance from disk.
1796 This function will return an OS instance if the given name is a
1799 @type base_dir: string
1800 @keyword base_dir: Base directory containing OS installations.
1801 Defaults to a search in all the OS_SEARCH_PATH dirs.
1803 @return: success and either the OS instance if we find a valid one,
1807 if base_dir is None:
1808 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1810 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1813 return False, "Directory for OS %s not found in search path" % name
1815 status, api_versions = _OSOndiskAPIVersion(os_dir)
1818 return status, api_versions
1820 if not constants.OS_API_VERSIONS.intersection(api_versions):
1821 return False, ("API version mismatch for path '%s': found %s, want %s." %
1822 (os_dir, api_versions, constants.OS_API_VERSIONS))
1824 # OS Files dictionary, we will populate it with the absolute path names
1825 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1827 if max(api_versions) >= constants.OS_API_V15:
1828 os_files[constants.OS_VARIANTS_FILE] = ''
1830 if max(api_versions) >= constants.OS_API_V20:
1831 os_files[constants.OS_PARAMETERS_FILE] = ''
1833 del os_files[constants.OS_SCRIPT_VERIFY]
1835 for filename in os_files:
1836 os_files[filename] = utils.PathJoin(os_dir, filename)
1839 st = os.stat(os_files[filename])
1840 except EnvironmentError, err:
1841 return False, ("File '%s' under path '%s' is missing (%s)" %
1842 (filename, os_dir, _ErrnoOrStr(err)))
1844 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1845 return False, ("File '%s' under path '%s' is not a regular file" %
1848 if filename in constants.OS_SCRIPTS:
1849 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1850 return False, ("File '%s' under path '%s' is not executable" %
1854 if constants.OS_VARIANTS_FILE in os_files:
1855 variants_file = os_files[constants.OS_VARIANTS_FILE]
1857 variants = utils.ReadFile(variants_file).splitlines()
1858 except EnvironmentError, err:
1859 return False, ("Error while reading the OS variants file at %s: %s" %
1860 (variants_file, _ErrnoOrStr(err)))
1862 return False, ("No supported os variant found")
1865 if constants.OS_PARAMETERS_FILE in os_files:
1866 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1868 parameters = utils.ReadFile(parameters_file).splitlines()
1869 except EnvironmentError, err:
1870 return False, ("Error while reading the OS parameters file at %s: %s" %
1871 (parameters_file, _ErrnoOrStr(err)))
1872 parameters = [v.split(None, 1) for v in parameters]
1874 os_obj = objects.OS(name=name, path=os_dir,
1875 create_script=os_files[constants.OS_SCRIPT_CREATE],
1876 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1877 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1878 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1879 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1881 supported_variants=variants,
1882 supported_parameters=parameters,
1883 api_versions=api_versions)
1887 def OSFromDisk(name, base_dir=None):
1888 """Create an OS instance from disk.
1890 This function will return an OS instance if the given name is a
1891 valid OS name. Otherwise, it will raise an appropriate
1892 L{RPCFail} exception, detailing why this is not a valid OS.
1894 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1895 an exception but returns true/false status data.
1897 @type base_dir: string
1898 @keyword base_dir: Base directory containing OS installations.
1899 Defaults to a search in all the OS_SEARCH_PATH dirs.
1900 @rtype: L{objects.OS}
1901 @return: the OS instance if we find a valid one
1902 @raise RPCFail: if we don't find a valid OS
1905 name_only = name.split("+", 1)[0]
1906 status, payload = _TryOSFromDisk(name_only, base_dir)
1914 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1915 """Calculate the basic environment for an os script.
1918 @param os_name: full operating system name (including variant)
1919 @type inst_os: L{objects.OS}
1920 @param inst_os: operating system for which the environment is being built
1921 @type os_params: dict
1922 @param os_params: the OS parameters
1923 @type debug: integer
1924 @param debug: debug level (0 or 1, for OS Api 10)
1926 @return: dict of environment variables
1927 @raise errors.BlockDeviceError: if the block device
1933 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1934 result['OS_API_VERSION'] = '%d' % api_version
1935 result['OS_NAME'] = inst_os.name
1936 result['DEBUG_LEVEL'] = '%d' % debug
1939 if api_version >= constants.OS_API_V15:
1941 variant = os_name.split('+', 1)[1]
1943 variant = inst_os.supported_variants[0]
1944 result['OS_VARIANT'] = variant
1947 for pname, pvalue in os_params.items():
1948 result['OSP_%s' % pname.upper()] = pvalue
1953 def OSEnvironment(instance, inst_os, debug=0):
1954 """Calculate the environment for an os script.
1956 @type instance: L{objects.Instance}
1957 @param instance: target instance for the os script run
1958 @type inst_os: L{objects.OS}
1959 @param inst_os: operating system for which the environment is being built
1960 @type debug: integer
1961 @param debug: debug level (0 or 1, for OS Api 10)
1963 @return: dict of environment variables
1964 @raise errors.BlockDeviceError: if the block device
1968 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
1970 result['INSTANCE_NAME'] = instance.name
1971 result['INSTANCE_OS'] = instance.os
1972 result['HYPERVISOR'] = instance.hypervisor
1973 result['DISK_COUNT'] = '%d' % len(instance.disks)
1974 result['NIC_COUNT'] = '%d' % len(instance.nics)
1977 for idx, disk in enumerate(instance.disks):
1978 real_disk = _OpenRealBD(disk)
1979 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1980 result['DISK_%d_ACCESS' % idx] = disk.mode
1981 if constants.HV_DISK_TYPE in instance.hvparams:
1982 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1983 instance.hvparams[constants.HV_DISK_TYPE]
1984 if disk.dev_type in constants.LDS_BLOCK:
1985 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1986 elif disk.dev_type == constants.LD_FILE:
1987 result['DISK_%d_BACKEND_TYPE' % idx] = \
1988 'file:%s' % disk.physical_id[0]
1991 for idx, nic in enumerate(instance.nics):
1992 result['NIC_%d_MAC' % idx] = nic.mac
1994 result['NIC_%d_IP' % idx] = nic.ip
1995 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1996 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1997 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1998 if nic.nicparams[constants.NIC_LINK]:
1999 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2000 if constants.HV_NIC_TYPE in instance.hvparams:
2001 result['NIC_%d_FRONTEND_TYPE' % idx] = \
2002 instance.hvparams[constants.HV_NIC_TYPE]
2005 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2006 for key, value in source.items():
2007 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2012 def BlockdevGrow(disk, amount):
2013 """Grow a stack of block devices.
2015 This function is called recursively, with the childrens being the
2016 first ones to resize.
2018 @type disk: L{objects.Disk}
2019 @param disk: the disk to be grown
2020 @rtype: (status, result)
2021 @return: a tuple with the status of the operation
2022 (True/False), and the errors message if status
2026 r_dev = _RecursiveFindBD(disk)
2028 _Fail("Cannot find block device %s", disk)
2032 except errors.BlockDeviceError, err:
2033 _Fail("Failed to grow block device: %s", err, exc=True)
2036 def BlockdevSnapshot(disk):
2037 """Create a snapshot copy of a block device.
2039 This function is called recursively, and the snapshot is actually created
2040 just for the leaf lvm backend device.
2042 @type disk: L{objects.Disk}
2043 @param disk: the disk to be snapshotted
2045 @return: snapshot disk path
2048 if disk.dev_type == constants.LD_DRBD8:
2049 if not disk.children:
2050 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2052 return BlockdevSnapshot(disk.children[0])
2053 elif disk.dev_type == constants.LD_LV:
2054 r_dev = _RecursiveFindBD(disk)
2055 if r_dev is not None:
2056 # FIXME: choose a saner value for the snapshot size
2057 # let's stay on the safe side and ask for the full size, for now
2058 return r_dev.Snapshot(disk.size)
2060 _Fail("Cannot find block device %s", disk)
2062 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2063 disk.unique_id, disk.dev_type)
2066 def FinalizeExport(instance, snap_disks):
2067 """Write out the export configuration information.
2069 @type instance: L{objects.Instance}
2070 @param instance: the instance which we export, used for
2071 saving configuration
2072 @type snap_disks: list of L{objects.Disk}
2073 @param snap_disks: list of snapshot block devices, which
2074 will be used to get the actual name of the dump file
2079 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2080 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2082 config = objects.SerializableConfigParser()
2084 config.add_section(constants.INISECT_EXP)
2085 config.set(constants.INISECT_EXP, 'version', '0')
2086 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2087 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2088 config.set(constants.INISECT_EXP, 'os', instance.os)
2089 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2091 config.add_section(constants.INISECT_INS)
2092 config.set(constants.INISECT_INS, 'name', instance.name)
2093 config.set(constants.INISECT_INS, 'memory', '%d' %
2094 instance.beparams[constants.BE_MEMORY])
2095 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2096 instance.beparams[constants.BE_VCPUS])
2097 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2098 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2101 for nic_count, nic in enumerate(instance.nics):
2103 config.set(constants.INISECT_INS, 'nic%d_mac' %
2104 nic_count, '%s' % nic.mac)
2105 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2106 for param in constants.NICS_PARAMETER_TYPES:
2107 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2108 '%s' % nic.nicparams.get(param, None))
2109 # TODO: redundant: on load can read nics until it doesn't exist
2110 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2113 for disk_count, disk in enumerate(snap_disks):
2116 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2117 ('%s' % disk.iv_name))
2118 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2119 ('%s' % disk.physical_id[1]))
2120 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2123 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2125 # New-style hypervisor/backend parameters
2127 config.add_section(constants.INISECT_HYP)
2128 for name, value in instance.hvparams.items():
2129 if name not in constants.HVC_GLOBALS:
2130 config.set(constants.INISECT_HYP, name, str(value))
2132 config.add_section(constants.INISECT_BEP)
2133 for name, value in instance.beparams.items():
2134 config.set(constants.INISECT_BEP, name, str(value))
2136 config.add_section(constants.INISECT_OSP)
2137 for name, value in instance.osparams.items():
2138 config.set(constants.INISECT_OSP, name, str(value))
2140 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2141 data=config.Dumps())
2142 shutil.rmtree(finaldestdir, ignore_errors=True)
2143 shutil.move(destdir, finaldestdir)
2146 def ExportInfo(dest):
2147 """Get export configuration information.
2150 @param dest: directory containing the export
2152 @rtype: L{objects.SerializableConfigParser}
2153 @return: a serializable config file containing the
2157 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2159 config = objects.SerializableConfigParser()
2162 if (not config.has_section(constants.INISECT_EXP) or
2163 not config.has_section(constants.INISECT_INS)):
2164 _Fail("Export info file doesn't have the required fields")
2166 return config.Dumps()
2170 """Return a list of exports currently available on this machine.
2173 @return: list of the exports
2176 if os.path.isdir(constants.EXPORT_DIR):
2177 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2179 _Fail("No exports directory")
2182 def RemoveExport(export):
2183 """Remove an existing export from the node.
2186 @param export: the name of the export to remove
2190 target = utils.PathJoin(constants.EXPORT_DIR, export)
2193 shutil.rmtree(target)
2194 except EnvironmentError, err:
2195 _Fail("Error while removing the export: %s", err, exc=True)
2198 def BlockdevRename(devlist):
2199 """Rename a list of block devices.
2201 @type devlist: list of tuples
2202 @param devlist: list of tuples of the form (disk,
2203 new_logical_id, new_physical_id); disk is an
2204 L{objects.Disk} object describing the current disk,
2205 and new logical_id/physical_id is the name we
2208 @return: True if all renames succeeded, False otherwise
2213 for disk, unique_id in devlist:
2214 dev = _RecursiveFindBD(disk)
2216 msgs.append("Can't find device %s in rename" % str(disk))
2220 old_rpath = dev.dev_path
2221 dev.Rename(unique_id)
2222 new_rpath = dev.dev_path
2223 if old_rpath != new_rpath:
2224 DevCacheManager.RemoveCache(old_rpath)
2225 # FIXME: we should add the new cache information here, like:
2226 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2227 # but we don't have the owner here - maybe parse from existing
2228 # cache? for now, we only lose lvm data when we rename, which
2229 # is less critical than DRBD or MD
2230 except errors.BlockDeviceError, err:
2231 msgs.append("Can't rename device '%s' to '%s': %s" %
2232 (dev, unique_id, err))
2233 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2236 _Fail("; ".join(msgs))
2239 def _TransformFileStorageDir(file_storage_dir):
2240 """Checks whether given file_storage_dir is valid.
2242 Checks wheter the given file_storage_dir is within the cluster-wide
2243 default file_storage_dir stored in SimpleStore. Only paths under that
2244 directory are allowed.
2246 @type file_storage_dir: str
2247 @param file_storage_dir: the path to check
2249 @return: the normalized path if valid, None otherwise
2252 if not constants.ENABLE_FILE_STORAGE:
2253 _Fail("File storage disabled at configure time")
2255 file_storage_dir = os.path.normpath(file_storage_dir)
2256 base_file_storage_dir = cfg.GetFileStorageDir()
2257 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2258 base_file_storage_dir):
2259 _Fail("File storage directory '%s' is not under base file"
2260 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2261 return file_storage_dir
2264 def CreateFileStorageDir(file_storage_dir):
2265 """Create file storage directory.
2267 @type file_storage_dir: str
2268 @param file_storage_dir: directory to create
2271 @return: tuple with first element a boolean indicating wheter dir
2272 creation was successful or not
2275 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2276 if os.path.exists(file_storage_dir):
2277 if not os.path.isdir(file_storage_dir):
2278 _Fail("Specified storage dir '%s' is not a directory",
2282 os.makedirs(file_storage_dir, 0750)
2283 except OSError, err:
2284 _Fail("Cannot create file storage directory '%s': %s",
2285 file_storage_dir, err, exc=True)
2288 def RemoveFileStorageDir(file_storage_dir):
2289 """Remove file storage directory.
2291 Remove it only if it's empty. If not log an error and return.
2293 @type file_storage_dir: str
2294 @param file_storage_dir: the directory we should cleanup
2295 @rtype: tuple (success,)
2296 @return: tuple of one element, C{success}, denoting
2297 whether the operation was successful
2300 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2301 if os.path.exists(file_storage_dir):
2302 if not os.path.isdir(file_storage_dir):
2303 _Fail("Specified Storage directory '%s' is not a directory",
2305 # deletes dir only if empty, otherwise we want to fail the rpc call
2307 os.rmdir(file_storage_dir)
2308 except OSError, err:
2309 _Fail("Cannot remove file storage directory '%s': %s",
2310 file_storage_dir, err)
2313 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2314 """Rename the file storage directory.
2316 @type old_file_storage_dir: str
2317 @param old_file_storage_dir: the current path
2318 @type new_file_storage_dir: str
2319 @param new_file_storage_dir: the name we should rename to
2320 @rtype: tuple (success,)
2321 @return: tuple of one element, C{success}, denoting
2322 whether the operation was successful
2325 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2326 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2327 if not os.path.exists(new_file_storage_dir):
2328 if os.path.isdir(old_file_storage_dir):
2330 os.rename(old_file_storage_dir, new_file_storage_dir)
2331 except OSError, err:
2332 _Fail("Cannot rename '%s' to '%s': %s",
2333 old_file_storage_dir, new_file_storage_dir, err)
2335 _Fail("Specified storage dir '%s' is not a directory",
2336 old_file_storage_dir)
2338 if os.path.exists(old_file_storage_dir):
2339 _Fail("Cannot rename '%s' to '%s': both locations exist",
2340 old_file_storage_dir, new_file_storage_dir)
2343 def _EnsureJobQueueFile(file_name):
2344 """Checks whether the given filename is in the queue directory.
2346 @type file_name: str
2347 @param file_name: the file name we should check
2349 @raises RPCFail: if the file is not valid
2352 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2353 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2356 _Fail("Passed job queue file '%s' does not belong to"
2357 " the queue directory '%s'", file_name, queue_dir)
2360 def JobQueueUpdate(file_name, content):
2361 """Updates a file in the queue directory.
2363 This is just a wrapper over L{utils.WriteFile}, with proper
2366 @type file_name: str
2367 @param file_name: the job file name
2369 @param content: the new job contents
2371 @return: the success of the operation
2374 _EnsureJobQueueFile(file_name)
2375 getents = runtime.GetEnts()
2377 # Write and replace the file atomically
2378 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2379 gid=getents.masterd_gid)
2382 def JobQueueRename(old, new):
2383 """Renames a job queue file.
2385 This is just a wrapper over os.rename with proper checking.
2388 @param old: the old (actual) file name
2390 @param new: the desired file name
2392 @return: the success of the operation and payload
2395 _EnsureJobQueueFile(old)
2396 _EnsureJobQueueFile(new)
2398 utils.RenameFile(old, new, mkdir=True)
2401 def BlockdevClose(instance_name, disks):
2402 """Closes the given block devices.
2404 This means they will be switched to secondary mode (in case of
2407 @param instance_name: if the argument is not empty, the symlinks
2408 of this instance will be removed
2409 @type disks: list of L{objects.Disk}
2410 @param disks: the list of disks to be closed
2411 @rtype: tuple (success, message)
2412 @return: a tuple of success and message, where success
2413 indicates the succes of the operation, and message
2414 which will contain the error details in case we
2420 rd = _RecursiveFindBD(cf)
2422 _Fail("Can't find device %s", cf)
2429 except errors.BlockDeviceError, err:
2430 msg.append(str(err))
2432 _Fail("Can't make devices secondary: %s", ",".join(msg))
2435 _RemoveBlockDevLinks(instance_name, disks)
2438 def ValidateHVParams(hvname, hvparams):
2439 """Validates the given hypervisor parameters.
2441 @type hvname: string
2442 @param hvname: the hypervisor name
2443 @type hvparams: dict
2444 @param hvparams: the hypervisor parameters to be validated
2449 hv_type = hypervisor.GetHypervisor(hvname)
2450 hv_type.ValidateParameters(hvparams)
2451 except errors.HypervisorError, err:
2452 _Fail(str(err), log=False)
2455 def _CheckOSPList(os_obj, parameters):
2456 """Check whether a list of parameters is supported by the OS.
2458 @type os_obj: L{objects.OS}
2459 @param os_obj: OS object to check
2460 @type parameters: list
2461 @param parameters: the list of parameters to check
2464 supported = [v[0] for v in os_obj.supported_parameters]
2465 delta = frozenset(parameters).difference(supported)
2467 _Fail("The following parameters are not supported"
2468 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2471 def ValidateOS(required, osname, checks, osparams):
2472 """Validate the given OS' parameters.
2474 @type required: boolean
2475 @param required: whether absence of the OS should translate into
2477 @type osname: string
2478 @param osname: the OS to be validated
2480 @param checks: list of the checks to run (currently only 'parameters')
2481 @type osparams: dict
2482 @param osparams: dictionary with OS parameters
2484 @return: True if the validation passed, or False if the OS was not
2485 found and L{required} was false
2488 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2489 _Fail("Unknown checks required for OS %s: %s", osname,
2490 set(checks).difference(constants.OS_VALIDATE_CALLS))
2492 name_only = osname.split("+", 1)[0]
2493 status, tbv = _TryOSFromDisk(name_only, None)
2501 if max(tbv.api_versions) < constants.OS_API_V20:
2504 if constants.OS_VALIDATE_PARAMETERS in checks:
2505 _CheckOSPList(tbv, osparams.keys())
2507 validate_env = OSCoreEnv(osname, tbv, osparams)
2508 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2511 logging.error("os validate command '%s' returned error: %s output: %s",
2512 result.cmd, result.fail_reason, result.output)
2513 _Fail("OS validation script failed (%s), output: %s",
2514 result.fail_reason, result.output, log=False)
2520 """Demotes the current node from master candidate role.
2523 # try to ensure we're not the master by mistake
2524 master, myself = ssconf.GetMasterAndMyself()
2525 if master == myself:
2526 _Fail("ssconf status shows I'm the master node, will not demote")
2528 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2529 if not result.failed:
2530 _Fail("The master daemon is running, will not demote")
2533 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2534 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2535 except EnvironmentError, err:
2536 if err.errno != errno.ENOENT:
2537 _Fail("Error while backing up cluster file: %s", err, exc=True)
2539 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2542 def _GetX509Filenames(cryptodir, name):
2543 """Returns the full paths for the private key and certificate.
2546 return (utils.PathJoin(cryptodir, name),
2547 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2548 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2551 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2552 """Creates a new X509 certificate for SSL/TLS.
2555 @param validity: Validity in seconds
2556 @rtype: tuple; (string, string)
2557 @return: Certificate name and public part
2560 (key_pem, cert_pem) = \
2561 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2562 min(validity, _MAX_SSL_CERT_VALIDITY))
2564 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2565 prefix="x509-%s-" % utils.TimestampForFilename())
2567 name = os.path.basename(cert_dir)
2568 assert len(name) > 5
2570 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2572 utils.WriteFile(key_file, mode=0400, data=key_pem)
2573 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2575 # Never return private key as it shouldn't leave the node
2576 return (name, cert_pem)
2578 shutil.rmtree(cert_dir, ignore_errors=True)
2582 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2583 """Removes a X509 certificate.
2586 @param name: Certificate name
2589 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2591 utils.RemoveFile(key_file)
2592 utils.RemoveFile(cert_file)
2596 except EnvironmentError, err:
2597 _Fail("Cannot remove certificate directory '%s': %s",
2601 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2602 """Returns the command for the requested input/output.
2604 @type instance: L{objects.Instance}
2605 @param instance: The instance object
2606 @param mode: Import/export mode
2607 @param ieio: Input/output type
2608 @param ieargs: Input/output arguments
2611 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2618 if ieio == constants.IEIO_FILE:
2619 (filename, ) = ieargs
2621 if not utils.IsNormAbsPath(filename):
2622 _Fail("Path '%s' is not normalized or absolute", filename)
2624 directory = os.path.normpath(os.path.dirname(filename))
2626 if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2627 constants.EXPORT_DIR):
2628 _Fail("File '%s' is not under exports directory '%s'",
2629 filename, constants.EXPORT_DIR)
2632 utils.Makedirs(directory, mode=0750)
2634 quoted_filename = utils.ShellQuote(filename)
2636 if mode == constants.IEM_IMPORT:
2637 suffix = "> %s" % quoted_filename
2638 elif mode == constants.IEM_EXPORT:
2639 suffix = "< %s" % quoted_filename
2641 # Retrieve file size
2643 st = os.stat(filename)
2644 except EnvironmentError, err:
2645 logging.error("Can't stat(2) %s: %s", filename, err)
2647 exp_size = utils.BytesToMebibyte(st.st_size)
2649 elif ieio == constants.IEIO_RAW_DISK:
2652 real_disk = _OpenRealBD(disk)
2654 if mode == constants.IEM_IMPORT:
2655 # we set here a smaller block size as, due to transport buffering, more
2656 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2657 # is not already there or we pass a wrong path; we use notrunc to no
2658 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2659 # much memory; this means that at best, we flush every 64k, which will
2661 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2662 " bs=%s oflag=dsync"),
2666 elif mode == constants.IEM_EXPORT:
2667 # the block size on the read dd is 1MiB to match our units
2668 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2670 str(1024 * 1024), # 1 MB
2672 exp_size = disk.size
2674 elif ieio == constants.IEIO_SCRIPT:
2675 (disk, disk_index, ) = ieargs
2677 assert isinstance(disk_index, (int, long))
2679 real_disk = _OpenRealBD(disk)
2681 inst_os = OSFromDisk(instance.os)
2682 env = OSEnvironment(instance, inst_os)
2684 if mode == constants.IEM_IMPORT:
2685 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2686 env["IMPORT_INDEX"] = str(disk_index)
2687 script = inst_os.import_script
2689 elif mode == constants.IEM_EXPORT:
2690 env["EXPORT_DEVICE"] = real_disk.dev_path
2691 env["EXPORT_INDEX"] = str(disk_index)
2692 script = inst_os.export_script
2694 # TODO: Pass special environment only to script
2695 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2697 if mode == constants.IEM_IMPORT:
2698 suffix = "| %s" % script_cmd
2700 elif mode == constants.IEM_EXPORT:
2701 prefix = "%s |" % script_cmd
2703 # Let script predict size
2704 exp_size = constants.IE_CUSTOM_SIZE
2707 _Fail("Invalid %s I/O mode %r", mode, ieio)
2709 return (env, prefix, suffix, exp_size)
2712 def _CreateImportExportStatusDir(prefix):
2713 """Creates status directory for import/export.
2716 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2718 (prefix, utils.TimestampForFilename())))
2721 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2722 """Starts an import or export daemon.
2724 @param mode: Import/output mode
2725 @type opts: L{objects.ImportExportOptions}
2726 @param opts: Daemon options
2728 @param host: Remote host for export (None for import)
2730 @param port: Remote port for export (None for import)
2731 @type instance: L{objects.Instance}
2732 @param instance: Instance object
2733 @param ieio: Input/output type
2734 @param ieioargs: Input/output arguments
2737 if mode == constants.IEM_IMPORT:
2740 if not (host is None and port is None):
2741 _Fail("Can not specify host or port on import")
2743 elif mode == constants.IEM_EXPORT:
2746 if host is None or port is None:
2747 _Fail("Host and port must be specified for an export")
2750 _Fail("Invalid mode %r", mode)
2752 if (opts.key_name is None) ^ (opts.ca_pem is None):
2753 _Fail("Cluster certificate can only be used for both key and CA")
2755 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2756 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2758 if opts.key_name is None:
2760 key_path = constants.NODED_CERT_FILE
2761 cert_path = constants.NODED_CERT_FILE
2762 assert opts.ca_pem is None
2764 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2766 assert opts.ca_pem is not None
2768 for i in [key_path, cert_path]:
2769 if not os.path.exists(i):
2770 _Fail("File '%s' does not exist" % i)
2772 status_dir = _CreateImportExportStatusDir(prefix)
2774 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2775 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2776 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2778 if opts.ca_pem is None:
2780 ca = utils.ReadFile(constants.NODED_CERT_FILE)
2785 utils.WriteFile(ca_file, data=ca, mode=0400)
2788 constants.IMPORT_EXPORT_DAEMON,
2790 "--key=%s" % key_path,
2791 "--cert=%s" % cert_path,
2792 "--ca=%s" % ca_file,
2796 cmd.append("--host=%s" % host)
2799 cmd.append("--port=%s" % port)
2802 cmd.append("--compress=%s" % opts.compress)
2805 cmd.append("--magic=%s" % opts.magic)
2807 if exp_size is not None:
2808 cmd.append("--expected-size=%s" % exp_size)
2811 cmd.append("--cmd-prefix=%s" % cmd_prefix)
2814 cmd.append("--cmd-suffix=%s" % cmd_suffix)
2816 logfile = _InstanceLogName(prefix, instance.os, instance.name)
2818 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2819 # support for receiving a file descriptor for output
2820 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2823 # The import/export name is simply the status directory name
2824 return os.path.basename(status_dir)
2827 shutil.rmtree(status_dir, ignore_errors=True)
2831 def GetImportExportStatus(names):
2832 """Returns import/export daemon status.
2834 @type names: sequence
2835 @param names: List of names
2836 @rtype: List of dicts
2837 @return: Returns a list of the state of each named import/export or None if a
2838 status couldn't be read
2844 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2848 data = utils.ReadFile(status_file)
2849 except EnvironmentError, err:
2850 if err.errno != errno.ENOENT:
2858 result.append(serializer.LoadJson(data))
2863 def AbortImportExport(name):
2864 """Sends SIGTERM to a running import/export daemon.
2867 logging.info("Abort import/export %s", name)
2869 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2870 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2873 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2875 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2878 def CleanupImportExport(name):
2879 """Cleanup after an import or export.
2881 If the import/export daemon is still running it's killed. Afterwards the
2882 whole status directory is removed.
2885 logging.info("Finalizing import/export %s", name)
2887 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2889 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2892 logging.info("Import/export %s is still running with PID %s",
2894 utils.KillProcess(pid, waitpid=False)
2896 shutil.rmtree(status_dir, ignore_errors=True)
2899 def _FindDisks(nodes_ip, disks):
2900 """Sets the physical ID on disks and returns the block devices.
2903 # set the correct physical ID
2904 my_name = netutils.Hostname.GetSysName()
2906 cf.SetPhysicalID(my_name, nodes_ip)
2911 rd = _RecursiveFindBD(cf)
2913 _Fail("Can't find device %s", cf)
2918 def DrbdDisconnectNet(nodes_ip, disks):
2919 """Disconnects the network on a list of drbd devices.
2922 bdevs = _FindDisks(nodes_ip, disks)
2928 except errors.BlockDeviceError, err:
2929 _Fail("Can't change network configuration to standalone mode: %s",
2933 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2934 """Attaches the network on a list of drbd devices.
2937 bdevs = _FindDisks(nodes_ip, disks)
2940 for idx, rd in enumerate(bdevs):
2942 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2943 except EnvironmentError, err:
2944 _Fail("Can't create symlink: %s", err)
2945 # reconnect disks, switch to new master configuration and if
2946 # needed primary mode
2949 rd.AttachNet(multimaster)
2950 except errors.BlockDeviceError, err:
2951 _Fail("Can't change network configuration: %s", err)
2953 # wait until the disks are connected; we need to retry the re-attach
2954 # if the device becomes standalone, as this might happen if the one
2955 # node disconnects and reconnects in a different mode before the
2956 # other node reconnects; in this case, one or both of the nodes will
2957 # decide it has wrong configuration and switch to standalone
2960 all_connected = True
2963 stats = rd.GetProcStatus()
2965 all_connected = (all_connected and
2966 (stats.is_connected or stats.is_in_resync))
2968 if stats.is_standalone:
2969 # peer had different config info and this node became
2970 # standalone, even though this should not happen with the
2971 # new staged way of changing disk configs
2973 rd.AttachNet(multimaster)
2974 except errors.BlockDeviceError, err:
2975 _Fail("Can't change network configuration: %s", err)
2977 if not all_connected:
2978 raise utils.RetryAgain()
2981 # Start with a delay of 100 miliseconds and go up to 5 seconds
2982 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2983 except utils.RetryTimeout:
2984 _Fail("Timeout in disk reconnecting")
2987 # change to primary mode
2991 except errors.BlockDeviceError, err:
2992 _Fail("Can't change to primary mode: %s", err)
2995 def DrbdWaitSync(nodes_ip, disks):
2996 """Wait until DRBDs have synchronized.
3000 stats = rd.GetProcStatus()
3001 if not (stats.is_connected or stats.is_in_resync):
3002 raise utils.RetryAgain()
3005 bdevs = _FindDisks(nodes_ip, disks)
3011 # poll each second for 15 seconds
3012 stats = utils.Retry(_helper, 1, 15, args=[rd])
3013 except utils.RetryTimeout:
3014 stats = rd.GetProcStatus()
3016 if not (stats.is_connected or stats.is_in_resync):
3017 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3018 alldone = alldone and (not stats.is_in_resync)
3019 if stats.sync_percent is not None:
3020 min_resync = min(min_resync, stats.sync_percent)
3022 return (alldone, min_resync)
3025 def GetDrbdUsermodeHelper():
3026 """Returns DRBD usermode helper currently configured.
3030 return bdev.BaseDRBD.GetUsermodeHelper()
3031 except errors.BlockDeviceError, err:
3035 def PowercycleNode(hypervisor_type):
3036 """Hard-powercycle the node.
3038 Because we need to return first, and schedule the powercycle in the
3039 background, we won't be able to report failures nicely.
3042 hyper = hypervisor.GetHypervisor(hypervisor_type)
3046 # if we can't fork, we'll pretend that we're in the child process
3049 return "Reboot scheduled in 5 seconds"
3050 # ensure the child is running on ram
3053 except Exception: # pylint: disable-msg=W0703
3056 hyper.PowercycleNode()
3059 class HooksRunner(object):
3062 This class is instantiated on the node side (ganeti-noded) and not
3066 def __init__(self, hooks_base_dir=None):
3067 """Constructor for hooks runner.
3069 @type hooks_base_dir: str or None
3070 @param hooks_base_dir: if not None, this overrides the
3071 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3074 if hooks_base_dir is None:
3075 hooks_base_dir = constants.HOOKS_BASE_DIR
3076 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3078 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3080 def RunHooks(self, hpath, phase, env):
3081 """Run the scripts in the hooks directory.
3084 @param hpath: the path to the hooks directory which
3087 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3088 L{constants.HOOKS_PHASE_POST}
3090 @param env: dictionary with the environment for the hook
3092 @return: list of 3-element tuples:
3094 - script result, either L{constants.HKR_SUCCESS} or
3095 L{constants.HKR_FAIL}
3096 - output of the script
3098 @raise errors.ProgrammerError: for invalid input
3102 if phase == constants.HOOKS_PHASE_PRE:
3104 elif phase == constants.HOOKS_PHASE_POST:
3107 _Fail("Unknown hooks phase '%s'", phase)
3110 subdir = "%s-%s.d" % (hpath, suffix)
3111 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3115 if not os.path.isdir(dir_name):
3116 # for non-existing/non-dirs, we simply exit instead of logging a
3117 # warning at every operation
3120 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3122 for (relname, relstatus, runresult) in runparts_results:
3123 if relstatus == constants.RUNPARTS_SKIP:
3124 rrval = constants.HKR_SKIP
3126 elif relstatus == constants.RUNPARTS_ERR:
3127 rrval = constants.HKR_FAIL
3128 output = "Hook script execution error: %s" % runresult
3129 elif relstatus == constants.RUNPARTS_RUN:
3130 if runresult.failed:
3131 rrval = constants.HKR_FAIL
3133 rrval = constants.HKR_SUCCESS
3134 output = utils.SafeEncode(runresult.output.strip())
3135 results.append(("%s/%s" % (subdir, relname), rrval, output))
3140 class IAllocatorRunner(object):
3141 """IAllocator runner.
3143 This class is instantiated on the node side (ganeti-noded) and not on
3148 def Run(name, idata):
3149 """Run an iallocator script.
3152 @param name: the iallocator script name
3154 @param idata: the allocator input data
3157 @return: two element tuple of:
3159 - either error message or stdout of allocator (for success)
3162 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3164 if alloc_script is None:
3165 _Fail("iallocator module '%s' not found in the search path", name)
3167 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3171 result = utils.RunCmd([alloc_script, fin_name])
3173 _Fail("iallocator module '%s' failed: %s, output '%s'",
3174 name, result.fail_reason, result.output)
3178 return result.stdout
3181 class DevCacheManager(object):
3182 """Simple class for managing a cache of block device information.
3185 _DEV_PREFIX = "/dev/"
3186 _ROOT_DIR = constants.BDEV_CACHE_DIR
3189 def _ConvertPath(cls, dev_path):
3190 """Converts a /dev/name path to the cache file name.
3192 This replaces slashes with underscores and strips the /dev
3193 prefix. It then returns the full path to the cache file.
3196 @param dev_path: the C{/dev/} path name
3198 @return: the converted path name
3201 if dev_path.startswith(cls._DEV_PREFIX):
3202 dev_path = dev_path[len(cls._DEV_PREFIX):]
3203 dev_path = dev_path.replace("/", "_")
3204 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3208 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3209 """Updates the cache information for a given device.
3212 @param dev_path: the pathname of the device
3214 @param owner: the owner (instance name) of the device
3215 @type on_primary: bool
3216 @param on_primary: whether this is the primary
3219 @param iv_name: the instance-visible name of the
3220 device, as in objects.Disk.iv_name
3225 if dev_path is None:
3226 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3228 fpath = cls._ConvertPath(dev_path)
3234 iv_name = "not_visible"
3235 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3237 utils.WriteFile(fpath, data=fdata)
3238 except EnvironmentError, err:
3239 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3242 def RemoveCache(cls, dev_path):
3243 """Remove data for a dev_path.
3245 This is just a wrapper over L{utils.RemoveFile} with a converted
3246 path name and logging.
3249 @param dev_path: the pathname of the device
3254 if dev_path is None:
3255 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3257 fpath = cls._ConvertPath(dev_path)
3259 utils.RemoveFile(fpath)
3260 except EnvironmentError, err:
3261 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)