4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
68 constants.JOB_QUEUE_ARCHIVE_DIR,
70 constants.CRYPTO_KEYS_DIR,
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
80 class RPCFail(Exception):
81 """Class denoting RPC failure.
83 Its argument is the error message.
88 def _Fail(msg, *args, **kwargs):
89 """Log an error and the raise an RPCFail exception.
91 This exception is then handled specially in the ganeti daemon and
92 turned into a 'failed' return type. As such, this function is a
93 useful shortcut for logging the error and returning it to the master
97 @param msg: the text of the exception
103 if "log" not in kwargs or kwargs["log"]: # if we should log this error
104 if "exc" in kwargs and kwargs["exc"]:
105 logging.exception(msg)
112 """Simple wrapper to return a SimpleStore.
114 @rtype: L{ssconf.SimpleStore}
115 @return: a SimpleStore instance
118 return ssconf.SimpleStore()
121 def _GetSshRunner(cluster_name):
122 """Simple wrapper to return an SshRunner.
124 @type cluster_name: str
125 @param cluster_name: the cluster name, which is needed
126 by the SshRunner constructor
127 @rtype: L{ssh.SshRunner}
128 @return: an SshRunner instance
131 return ssh.SshRunner(cluster_name)
134 def _Decompress(data):
135 """Unpacks data compressed by the RPC client.
137 @type data: list or tuple
138 @param data: Data sent by RPC client
140 @return: Decompressed data
143 assert isinstance(data, (list, tuple))
144 assert len(data) == 2
145 (encoding, content) = data
146 if encoding == constants.RPC_ENCODING_NONE:
148 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149 return zlib.decompress(base64.b64decode(content))
151 raise AssertionError("Unknown data encoding")
154 def _CleanDirectory(path, exclude=None):
155 """Removes all regular files in a directory.
158 @param path: the directory to clean
160 @param exclude: list of files to be excluded, defaults
164 if path not in _ALLOWED_CLEAN_DIRS:
165 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
168 if not os.path.isdir(path):
173 # Normalize excluded paths
174 exclude = [os.path.normpath(i) for i in exclude]
176 for rel_name in utils.ListVisibleFiles(path):
177 full_name = utils.PathJoin(path, rel_name)
178 if full_name in exclude:
180 if os.path.isfile(full_name) and not os.path.islink(full_name):
181 utils.RemoveFile(full_name)
184 def _BuildUploadFileList():
185 """Build the list of allowed upload files.
187 This is abstracted so that it's built only once at module import time.
190 allowed_files = set([
191 constants.CLUSTER_CONF_FILE,
193 constants.SSH_KNOWN_HOSTS_FILE,
194 constants.VNC_PASSWORD_FILE,
195 constants.RAPI_CERT_FILE,
196 constants.RAPI_USERS_FILE,
197 constants.CONFD_HMAC_KEY,
198 constants.CLUSTER_DOMAIN_SECRET_FILE,
201 for hv_name in constants.HYPER_TYPES:
202 hv_class = hypervisor.GetHypervisorClass(hv_name)
203 allowed_files.update(hv_class.GetAncillaryFiles())
205 return frozenset(allowed_files)
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212 """Removes job queue files and archived jobs.
218 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223 """Returns master information.
225 This is an utility function to compute master information, either
226 for consumption here or from the node daemon.
229 @return: master_netdev, master_ip, master_name, primary_ip_family
230 @raise RPCFail: in case of errors
235 master_netdev = cfg.GetMasterNetdev()
236 master_ip = cfg.GetMasterIP()
237 master_node = cfg.GetMasterNode()
238 primary_ip_family = cfg.GetPrimaryIPFamily()
239 except errors.ConfigurationError, err:
240 _Fail("Cluster configuration incomplete: %s", err, exc=True)
241 return (master_netdev, master_ip, master_node, primary_ip_family)
244 def StartMaster(start_daemons, no_voting):
245 """Activate local node as master node.
247 The function will either try activate the IP address of the master
248 (unless someone else has it) or also start the master daemons, based
249 on the start_daemons parameter.
251 @type start_daemons: boolean
252 @param start_daemons: whether to start the master daemons
253 (ganeti-masterd and ganeti-rapi), or (if false) activate the
255 @type no_voting: boolean
256 @param no_voting: whether to start ganeti-masterd without a node vote
257 (if start_daemons is True), but still non-interactively
261 # GetMasterInfo will raise an exception if not able to return data
262 master_netdev, master_ip, _, family = GetMasterInfo()
265 # either start the master and rapi daemons
268 masterd_args = "--no-voting --yes-do-it"
273 "EXTRA_MASTERD_ARGS": masterd_args,
276 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
278 msg = "Can't start Ganeti master: %s" % result.output
283 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284 if netutils.IPAddress.Own(master_ip):
285 # we already have the ip:
286 logging.debug("Master IP already configured, doing nothing")
288 msg = "Someone else has the master ip, not activating"
292 ipcls = netutils.IP4Address
293 if family == netutils.IP6Address.family:
294 ipcls = netutils.IP6Address
296 result = utils.RunCmd(["ip", "address", "add",
297 "%s/%d" % (master_ip, ipcls.iplen),
298 "dev", master_netdev, "label",
299 "%s:0" % master_netdev])
301 msg = "Can't activate master IP: %s" % result.output
305 # we ignore the exit code of the following cmds
306 if ipcls == netutils.IP4Address:
307 utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308 master_ip, master_ip])
309 elif ipcls == netutils.IP6Address:
311 utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312 except errors.OpExecError:
313 # TODO: Better error reporting
314 logging.warning("Can't execute ndisc6, please install if missing")
317 _Fail("; ".join(err_msgs))
320 def StopMaster(stop_daemons):
321 """Deactivate this node as master.
323 The function will always try to deactivate the IP address of the
324 master. It will also stop the master daemons depending on the
325 stop_daemons parameter.
327 @type stop_daemons: boolean
328 @param stop_daemons: whether to also stop the master daemons
329 (ganeti-masterd and ganeti-rapi)
333 # TODO: log and report back to the caller the error failures; we
334 # need to decide in which case we fail the RPC for this
336 # GetMasterInfo will raise an exception if not able to return data
337 master_netdev, master_ip, _, family = GetMasterInfo()
339 ipcls = netutils.IP4Address
340 if family == netutils.IP6Address.family:
341 ipcls = netutils.IP6Address
343 result = utils.RunCmd(["ip", "address", "del",
344 "%s/%d" % (master_ip, ipcls.iplen),
345 "dev", master_netdev])
347 logging.error("Can't remove the master IP, error: %s", result.output)
348 # but otherwise ignore the failure
351 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
353 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
355 result.cmd, result.exit_code, result.output)
358 def EtcHostsModify(mode, host, ip):
359 """Modify a host entry in /etc/hosts.
361 @param mode: The mode to operate. Either add or remove entry
362 @param host: The host to operate on
363 @param ip: The ip associated with the entry
366 if mode == constants.ETC_HOSTS_ADD:
368 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
370 utils.AddHostToEtcHosts(host, ip)
371 elif mode == constants.ETC_HOSTS_REMOVE:
373 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374 " parameter is present")
375 utils.RemoveHostFromEtcHosts(host)
377 RPCFail("Mode not supported")
380 def LeaveCluster(modify_ssh_setup):
381 """Cleans up and remove the current node.
383 This function cleans up and prepares the current node to be removed
386 If processing is successful, then it raises an
387 L{errors.QuitGanetiException} which is used as a special case to
388 shutdown the node daemon.
390 @param modify_ssh_setup: boolean
393 _CleanDirectory(constants.DATA_DIR)
394 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
399 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
401 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
403 utils.RemoveFile(priv_key)
404 utils.RemoveFile(pub_key)
405 except errors.OpExecError:
406 logging.exception("Error while processing ssh files")
409 utils.RemoveFile(constants.CONFD_HMAC_KEY)
410 utils.RemoveFile(constants.RAPI_CERT_FILE)
411 utils.RemoveFile(constants.NODED_CERT_FILE)
412 except: # pylint: disable-msg=W0702
413 logging.exception("Error while removing cluster secrets")
415 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
417 logging.error("Command %s failed with exitcode %s and error %s",
418 result.cmd, result.exit_code, result.output)
420 # Raise a custom exception (handled in ganeti-noded)
421 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
424 def GetNodeInfo(vgname, hypervisor_type):
425 """Gives back a hash with different information about the node.
427 @type vgname: C{string}
428 @param vgname: the name of the volume group to ask for disk space information
429 @type hypervisor_type: C{str}
430 @param hypervisor_type: the name of the hypervisor to ask for
433 @return: dictionary with the following keys:
434 - vg_size is the size of the configured volume group in MiB
435 - vg_free is the free size of the volume group in MiB
436 - memory_dom0 is the memory allocated for domain0 in MiB
437 - memory_free is the currently available (free) ram in MiB
438 - memory_total is the total number of ram in MiB
443 vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444 vg_free = vg_size = None
446 vg_free = int(round(vginfo[0][0], 0))
447 vg_size = int(round(vginfo[0][1], 0))
449 outputarray['vg_size'] = vg_size
450 outputarray['vg_free'] = vg_free
452 hyper = hypervisor.GetHypervisor(hypervisor_type)
453 hyp_info = hyper.GetNodeInfo()
454 if hyp_info is not None:
455 outputarray.update(hyp_info)
457 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462 def VerifyNode(what, cluster_name):
463 """Verify the status of the local node.
465 Based on the input L{what} parameter, various checks are done on the
468 If the I{filelist} key is present, this list of
469 files is checksummed and the file/checksum pairs are returned.
471 If the I{nodelist} key is present, we check that we have
472 connectivity via ssh with the target nodes (and check the hostname
475 If the I{node-net-test} key is present, we check that we have
476 connectivity to the given nodes via both primary IP and, if
477 applicable, secondary IPs.
480 @param what: a dictionary of things to check:
481 - filelist: list of files for which to compute checksums
482 - nodelist: list of nodes we should check ssh communication with
483 - node-net-test: list of nodes we should check node daemon port
485 - hypervisor: list with hypervisors to run the verify for
487 @return: a dictionary with the same keys as the input dict, and
488 values representing the result of the checks
492 my_name = netutils.Hostname.GetSysName()
493 port = netutils.GetDaemonPort(constants.NODED)
495 if constants.NV_HYPERVISOR in what:
496 result[constants.NV_HYPERVISOR] = tmp = {}
497 for hv_name in what[constants.NV_HYPERVISOR]:
499 val = hypervisor.GetHypervisor(hv_name).Verify()
500 except errors.HypervisorError, err:
501 val = "Error while checking hypervisor: %s" % str(err)
504 if constants.NV_FILELIST in what:
505 result[constants.NV_FILELIST] = utils.FingerprintFiles(
506 what[constants.NV_FILELIST])
508 if constants.NV_NODELIST in what:
509 result[constants.NV_NODELIST] = tmp = {}
510 random.shuffle(what[constants.NV_NODELIST])
511 for node in what[constants.NV_NODELIST]:
512 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
516 if constants.NV_NODENETTEST in what:
517 result[constants.NV_NODENETTEST] = tmp = {}
518 my_pip = my_sip = None
519 for name, pip, sip in what[constants.NV_NODENETTEST]:
525 tmp[my_name] = ("Can't find my own primary/secondary IP"
528 for name, pip, sip in what[constants.NV_NODENETTEST]:
530 if not netutils.TcpPing(pip, port, source=my_pip):
531 fail.append("primary")
533 if not netutils.TcpPing(sip, port, source=my_sip):
534 fail.append("secondary")
536 tmp[name] = ("failure using the %s interface(s)" %
539 if constants.NV_MASTERIP in what:
540 # FIXME: add checks on incoming data structures (here and in the
541 # rest of the function)
542 master_name, master_ip = what[constants.NV_MASTERIP]
543 if master_name == my_name:
544 source = constants.IP4_ADDRESS_LOCALHOST
547 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
550 if constants.NV_LVLIST in what:
552 val = GetVolumeList(what[constants.NV_LVLIST])
555 result[constants.NV_LVLIST] = val
557 if constants.NV_INSTANCELIST in what:
558 # GetInstanceList can fail
560 val = GetInstanceList(what[constants.NV_INSTANCELIST])
563 result[constants.NV_INSTANCELIST] = val
565 if constants.NV_VGLIST in what:
566 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
568 if constants.NV_PVLIST in what:
569 result[constants.NV_PVLIST] = \
570 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
571 filter_allocatable=False)
573 if constants.NV_VERSION in what:
574 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
575 constants.RELEASE_VERSION)
577 if constants.NV_HVINFO in what:
578 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
579 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
581 if constants.NV_DRBDLIST in what:
583 used_minors = bdev.DRBD8.GetUsedDevs().keys()
584 except errors.BlockDeviceError, err:
585 logging.warning("Can't get used minors list", exc_info=True)
586 used_minors = str(err)
587 result[constants.NV_DRBDLIST] = used_minors
589 if constants.NV_DRBDHELPER in what:
592 payload = bdev.BaseDRBD.GetUsermodeHelper()
593 except errors.BlockDeviceError, err:
594 logging.error("Can't get DRBD usermode helper: %s", str(err))
597 result[constants.NV_DRBDHELPER] = (status, payload)
599 if constants.NV_NODESETUP in what:
600 result[constants.NV_NODESETUP] = tmpr = []
601 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
602 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
603 " under /sys, missing required directories /sys/block"
604 " and /sys/class/net")
605 if (not os.path.isdir("/proc/sys") or
606 not os.path.isfile("/proc/sysrq-trigger")):
607 tmpr.append("The procfs filesystem doesn't seem to be mounted"
608 " under /proc, missing required directory /proc/sys and"
609 " the file /proc/sysrq-trigger")
611 if constants.NV_TIME in what:
612 result[constants.NV_TIME] = utils.SplitTime(time.time())
614 if constants.NV_OSLIST in what:
615 result[constants.NV_OSLIST] = DiagnoseOS()
620 def GetVolumeList(vg_name):
621 """Compute list of logical volumes and their size.
624 @param vg_name: the volume group whose LVs we should list
627 dictionary of all partions (key) with value being a tuple of
628 their size (in MiB), inactive and online status::
630 {'test1': ('20.06', True, True)}
632 in case of errors, a string is returned with the error
638 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
639 "--separator=%s" % sep,
640 "-olv_name,lv_size,lv_attr", vg_name])
642 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
644 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
645 for line in result.stdout.splitlines():
647 match = valid_line_re.match(line)
649 logging.error("Invalid line returned from lvs output: '%s'", line)
651 name, size, attr = match.groups()
652 inactive = attr[4] == '-'
653 online = attr[5] == 'o'
654 virtual = attr[0] == 'v'
656 # we don't want to report such volumes as existing, since they
657 # don't really hold data
659 lvs[name] = (size, inactive, online)
664 def ListVolumeGroups():
665 """List the volume groups and their size.
668 @return: dictionary with keys volume name and values the
672 return utils.ListVolumeGroups()
676 """List all volumes on this node.
680 A list of dictionaries, each having four keys:
681 - name: the logical volume name,
682 - size: the size of the logical volume
683 - dev: the physical device on which the LV lives
684 - vg: the volume group to which it belongs
686 In case of errors, we return an empty list and log the
689 Note that since a logical volume can live on multiple physical
690 volumes, the resulting list might include a logical volume
694 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
696 "--options=lv_name,lv_size,devices,vg_name"])
698 _Fail("Failed to list logical volumes, lvs output: %s",
702 return dev.split('(')[0]
705 return [parse_dev(x) for x in dev.split(",")]
708 line = [v.strip() for v in line]
709 return [{'name': line[0], 'size': line[1],
710 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
713 for line in result.stdout.splitlines():
714 if line.count('|') >= 3:
715 all_devs.extend(map_line(line.split('|')))
717 logging.warning("Strange line in the output from lvs: '%s'", line)
721 def BridgesExist(bridges_list):
722 """Check if a list of bridges exist on the current node.
725 @return: C{True} if all of them exist, C{False} otherwise
729 for bridge in bridges_list:
730 if not utils.BridgeExists(bridge):
731 missing.append(bridge)
734 _Fail("Missing bridges %s", utils.CommaJoin(missing))
737 def GetInstanceList(hypervisor_list):
738 """Provides a list of instances.
740 @type hypervisor_list: list
741 @param hypervisor_list: the list of hypervisors to query information
744 @return: a list of all running instances on the current node
745 - instance1.example.com
746 - instance2.example.com
750 for hname in hypervisor_list:
752 names = hypervisor.GetHypervisor(hname).ListInstances()
753 results.extend(names)
754 except errors.HypervisorError, err:
755 _Fail("Error enumerating instances (hypervisor %s): %s",
756 hname, err, exc=True)
761 def GetInstanceInfo(instance, hname):
762 """Gives back the information about an instance as a dictionary.
764 @type instance: string
765 @param instance: the instance name
767 @param hname: the hypervisor type of the instance
770 @return: dictionary with the following keys:
771 - memory: memory size of instance (int)
772 - state: xen state of instance (string)
773 - time: cpu time of instance (float)
778 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
779 if iinfo is not None:
780 output['memory'] = iinfo[2]
781 output['state'] = iinfo[4]
782 output['time'] = iinfo[5]
787 def GetInstanceMigratable(instance):
788 """Gives whether an instance can be migrated.
790 @type instance: L{objects.Instance}
791 @param instance: object representing the instance to be checked.
794 @return: tuple of (result, description) where:
795 - result: whether the instance can be migrated or not
796 - description: a description of the issue, if relevant
799 hyper = hypervisor.GetHypervisor(instance.hypervisor)
800 iname = instance.name
801 if iname not in hyper.ListInstances():
802 _Fail("Instance %s is not running", iname)
804 for idx in range(len(instance.disks)):
805 link_name = _GetBlockDevSymlinkPath(iname, idx)
806 if not os.path.islink(link_name):
807 logging.warning("Instance %s is missing symlink %s for disk %d",
808 iname, link_name, idx)
811 def GetAllInstancesInfo(hypervisor_list):
812 """Gather data about all instances.
814 This is the equivalent of L{GetInstanceInfo}, except that it
815 computes data for all instances at once, thus being faster if one
816 needs data about more than one instance.
818 @type hypervisor_list: list
819 @param hypervisor_list: list of hypervisors to query for instance data
822 @return: dictionary of instance: data, with data having the following keys:
823 - memory: memory size of instance (int)
824 - state: xen state of instance (string)
825 - time: cpu time of instance (float)
826 - vcpus: the number of vcpus
831 for hname in hypervisor_list:
832 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
834 for name, _, memory, vcpus, state, times in iinfo:
842 # we only check static parameters, like memory and vcpus,
843 # and not state and time which can change between the
844 # invocations of the different hypervisors
845 for key in 'memory', 'vcpus':
846 if value[key] != output[name][key]:
847 _Fail("Instance %s is running twice"
848 " with different parameters", name)
854 def _InstanceLogName(kind, os_name, instance):
855 """Compute the OS log filename for a given instance and operation.
857 The instance name and os name are passed in as strings since not all
858 operations have these as part of an instance object.
861 @param kind: the operation type (e.g. add, import, etc.)
862 @type os_name: string
863 @param os_name: the os name
864 @type instance: string
865 @param instance: the name of the instance being imported/added/etc.
868 # TODO: Use tempfile.mkstemp to create unique filename
869 base = ("%s-%s-%s-%s.log" %
870 (kind, os_name, instance, utils.TimestampForFilename()))
871 return utils.PathJoin(constants.LOG_OS_DIR, base)
874 def InstanceOsAdd(instance, reinstall, debug):
875 """Add an OS to an instance.
877 @type instance: L{objects.Instance}
878 @param instance: Instance whose OS is to be installed
879 @type reinstall: boolean
880 @param reinstall: whether this is an instance reinstall
882 @param debug: debug level, passed to the OS scripts
886 inst_os = OSFromDisk(instance.os)
888 create_env = OSEnvironment(instance, inst_os, debug)
890 create_env['INSTANCE_REINSTALL'] = "1"
892 logfile = _InstanceLogName("add", instance.os, instance.name)
894 result = utils.RunCmd([inst_os.create_script], env=create_env,
895 cwd=inst_os.path, output=logfile,)
897 logging.error("os create command '%s' returned error: %s, logfile: %s,"
898 " output: %s", result.cmd, result.fail_reason, logfile,
900 lines = [utils.SafeEncode(val)
901 for val in utils.TailFile(logfile, lines=20)]
902 _Fail("OS create script failed (%s), last lines in the"
903 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
906 def RunRenameInstance(instance, old_name, debug):
907 """Run the OS rename script for an instance.
909 @type instance: L{objects.Instance}
910 @param instance: Instance whose OS is to be installed
911 @type old_name: string
912 @param old_name: previous instance name
914 @param debug: debug level, passed to the OS scripts
916 @return: the success of the operation
919 inst_os = OSFromDisk(instance.os)
921 rename_env = OSEnvironment(instance, inst_os, debug)
922 rename_env['OLD_INSTANCE_NAME'] = old_name
924 logfile = _InstanceLogName("rename", instance.os,
925 "%s-%s" % (old_name, instance.name))
927 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928 cwd=inst_os.path, output=logfile)
931 logging.error("os create command '%s' returned error: %s output: %s",
932 result.cmd, result.fail_reason, result.output)
933 lines = [utils.SafeEncode(val)
934 for val in utils.TailFile(logfile, lines=20)]
935 _Fail("OS rename script failed (%s), last lines in the"
936 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
939 def _GetBlockDevSymlinkPath(instance_name, idx):
940 return utils.PathJoin(constants.DISK_LINKS_DIR,
941 "%s:%d" % (instance_name, idx))
944 def _SymlinkBlockDev(instance_name, device_path, idx):
945 """Set up symlinks to a instance's block device.
947 This is an auxiliary function run when an instance is start (on the primary
948 node) or when an instance is migrated (on the target node).
951 @param instance_name: the name of the target instance
952 @param device_path: path of the physical block device, on the node
953 @param idx: the disk index
954 @return: absolute path to the disk's symlink
957 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
959 os.symlink(device_path, link_name)
961 if err.errno == errno.EEXIST:
962 if (not os.path.islink(link_name) or
963 os.readlink(link_name) != device_path):
965 os.symlink(device_path, link_name)
972 def _RemoveBlockDevLinks(instance_name, disks):
973 """Remove the block device symlinks belonging to the given instance.
976 for idx, _ in enumerate(disks):
977 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978 if os.path.islink(link_name):
982 logging.exception("Can't remove symlink '%s'", link_name)
985 def _GatherAndLinkBlockDevs(instance):
986 """Set up an instance's block device(s).
988 This is run on the primary node at instance startup. The block
989 devices must be already assembled.
991 @type instance: L{objects.Instance}
992 @param instance: the instance whose disks we shoul assemble
994 @return: list of (disk_object, device_path)
998 for idx, disk in enumerate(instance.disks):
999 device = _RecursiveFindBD(disk)
1001 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1005 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1007 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1010 block_devices.append((disk, link_name))
1012 return block_devices
1015 def StartInstance(instance):
1016 """Start an instance.
1018 @type instance: L{objects.Instance}
1019 @param instance: the instance object
1023 running_instances = GetInstanceList([instance.hypervisor])
1025 if instance.name in running_instances:
1026 logging.info("Instance %s already running, not starting", instance.name)
1030 block_devices = _GatherAndLinkBlockDevs(instance)
1031 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1032 hyper.StartInstance(instance, block_devices)
1033 except errors.BlockDeviceError, err:
1034 _Fail("Block device error: %s", err, exc=True)
1035 except errors.HypervisorError, err:
1036 _RemoveBlockDevLinks(instance.name, instance.disks)
1037 _Fail("Hypervisor error: %s", err, exc=True)
1040 def InstanceShutdown(instance, timeout):
1041 """Shut an instance down.
1043 @note: this functions uses polling with a hardcoded timeout.
1045 @type instance: L{objects.Instance}
1046 @param instance: the instance object
1047 @type timeout: integer
1048 @param timeout: maximum timeout for soft shutdown
1052 hv_name = instance.hypervisor
1053 hyper = hypervisor.GetHypervisor(hv_name)
1054 iname = instance.name
1056 if instance.name not in hyper.ListInstances():
1057 logging.info("Instance %s not running, doing nothing", iname)
1062 self.tried_once = False
1065 if iname not in hyper.ListInstances():
1069 hyper.StopInstance(instance, retry=self.tried_once)
1070 except errors.HypervisorError, err:
1071 if iname not in hyper.ListInstances():
1072 # if the instance is no longer existing, consider this a
1073 # success and go to cleanup
1076 _Fail("Failed to stop instance %s: %s", iname, err)
1078 self.tried_once = True
1080 raise utils.RetryAgain()
1083 utils.Retry(_TryShutdown(), 5, timeout)
1084 except utils.RetryTimeout:
1085 # the shutdown did not succeed
1086 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1089 hyper.StopInstance(instance, force=True)
1090 except errors.HypervisorError, err:
1091 if iname in hyper.ListInstances():
1092 # only raise an error if the instance still exists, otherwise
1093 # the error could simply be "instance ... unknown"!
1094 _Fail("Failed to force stop instance %s: %s", iname, err)
1098 if iname in hyper.ListInstances():
1099 _Fail("Could not shutdown instance %s even by destroy", iname)
1102 hyper.CleanupInstance(instance.name)
1103 except errors.HypervisorError, err:
1104 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1106 _RemoveBlockDevLinks(iname, instance.disks)
1109 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1110 """Reboot an instance.
1112 @type instance: L{objects.Instance}
1113 @param instance: the instance object to reboot
1114 @type reboot_type: str
1115 @param reboot_type: the type of reboot, one the following
1117 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1118 instance OS, do not recreate the VM
1119 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1120 restart the VM (at the hypervisor level)
1121 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1122 not accepted here, since that mode is handled differently, in
1123 cmdlib, and translates into full stop and start of the
1124 instance (instead of a call_instance_reboot RPC)
1125 @type shutdown_timeout: integer
1126 @param shutdown_timeout: maximum timeout for soft shutdown
1130 running_instances = GetInstanceList([instance.hypervisor])
1132 if instance.name not in running_instances:
1133 _Fail("Cannot reboot instance %s that is not running", instance.name)
1135 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1136 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1138 hyper.RebootInstance(instance)
1139 except errors.HypervisorError, err:
1140 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1141 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1143 InstanceShutdown(instance, shutdown_timeout)
1144 return StartInstance(instance)
1145 except errors.HypervisorError, err:
1146 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1148 _Fail("Invalid reboot_type received: %s", reboot_type)
1151 def MigrationInfo(instance):
1152 """Gather information about an instance to be migrated.
1154 @type instance: L{objects.Instance}
1155 @param instance: the instance definition
1158 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160 info = hyper.MigrationInfo(instance)
1161 except errors.HypervisorError, err:
1162 _Fail("Failed to fetch migration information: %s", err, exc=True)
1166 def AcceptInstance(instance, info, target):
1167 """Prepare the node to accept an instance.
1169 @type instance: L{objects.Instance}
1170 @param instance: the instance definition
1171 @type info: string/data (opaque)
1172 @param info: migration information, from the source node
1173 @type target: string
1174 @param target: target host (usually ip), on this node
1177 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179 hyper.AcceptInstance(instance, info, target)
1180 except errors.HypervisorError, err:
1181 _Fail("Failed to accept instance: %s", err, exc=True)
1184 def FinalizeMigration(instance, info, success):
1185 """Finalize any preparation to accept an instance.
1187 @type instance: L{objects.Instance}
1188 @param instance: the instance definition
1189 @type info: string/data (opaque)
1190 @param info: migration information, from the source node
1191 @type success: boolean
1192 @param success: whether the migration was a success or a failure
1195 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197 hyper.FinalizeMigration(instance, info, success)
1198 except errors.HypervisorError, err:
1199 _Fail("Failed to finalize migration: %s", err, exc=True)
1202 def MigrateInstance(instance, target, live):
1203 """Migrates an instance to another node.
1205 @type instance: L{objects.Instance}
1206 @param instance: the instance definition
1207 @type target: string
1208 @param target: the target node name
1210 @param live: whether the migration should be done live or not (the
1211 interpretation of this parameter is left to the hypervisor)
1213 @return: a tuple of (success, msg) where:
1214 - succes is a boolean denoting the success/failure of the operation
1215 - msg is a string with details in case of failure
1218 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1221 hyper.MigrateInstance(instance, target, live)
1222 except errors.HypervisorError, err:
1223 _Fail("Failed to migrate instance: %s", err, exc=True)
1226 def BlockdevCreate(disk, size, owner, on_primary, info):
1227 """Creates a block device for an instance.
1229 @type disk: L{objects.Disk}
1230 @param disk: the object describing the disk we should create
1232 @param size: the size of the physical underlying device, in MiB
1234 @param owner: the name of the instance for which disk is created,
1235 used for device cache data
1236 @type on_primary: boolean
1237 @param on_primary: indicates if it is the primary node or not
1239 @param info: string that will be sent to the physical device
1240 creation, used for example to set (LVM) tags on LVs
1242 @return: the new unique_id of the device (this can sometime be
1243 computed only after creation), or None. On secondary nodes,
1244 it's not required to return anything.
1247 # TODO: remove the obsolete 'size' argument
1248 # pylint: disable-msg=W0613
1251 for child in disk.children:
1253 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1254 except errors.BlockDeviceError, err:
1255 _Fail("Can't assemble device %s: %s", child, err)
1256 if on_primary or disk.AssembleOnSecondary():
1257 # we need the children open in case the device itself has to
1260 # pylint: disable-msg=E1103
1262 except errors.BlockDeviceError, err:
1263 _Fail("Can't make child '%s' read-write: %s", child, err)
1267 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1268 except errors.BlockDeviceError, err:
1269 _Fail("Can't create block device: %s", err)
1271 if on_primary or disk.AssembleOnSecondary():
1274 except errors.BlockDeviceError, err:
1275 _Fail("Can't assemble device after creation, unusual event: %s", err)
1276 device.SetSyncSpeed(constants.SYNC_SPEED)
1277 if on_primary or disk.OpenOnSecondary():
1279 device.Open(force=True)
1280 except errors.BlockDeviceError, err:
1281 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1282 DevCacheManager.UpdateCache(device.dev_path, owner,
1283 on_primary, disk.iv_name)
1285 device.SetInfo(info)
1287 return device.unique_id
1290 def _WipeDevice(path):
1291 """This function actually wipes the device.
1293 @param path: The path to the device to wipe
1296 result = utils.RunCmd("%s%s" % (constants.WIPE_CMD, utils.ShellQuote(path)))
1299 _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1300 result.fail_reason, result.output)
1303 def BlockdevWipe(disk):
1304 """Wipes a block device.
1306 @type disk: L{objects.Disk}
1307 @param disk: the disk object we want to wipe
1311 rdev = _RecursiveFindBD(disk)
1312 except errors.BlockDeviceError, err:
1313 _Fail("Cannot execute wipe for device %s: device not found", err)
1315 _WipeDevice(rdev.dev_path)
1318 def BlockdevRemove(disk):
1319 """Remove a block device.
1321 @note: This is intended to be called recursively.
1323 @type disk: L{objects.Disk}
1324 @param disk: the disk object we should remove
1326 @return: the success of the operation
1331 rdev = _RecursiveFindBD(disk)
1332 except errors.BlockDeviceError, err:
1333 # probably can't attach
1334 logging.info("Can't attach to device %s in remove", disk)
1336 if rdev is not None:
1337 r_path = rdev.dev_path
1340 except errors.BlockDeviceError, err:
1341 msgs.append(str(err))
1343 DevCacheManager.RemoveCache(r_path)
1346 for child in disk.children:
1348 BlockdevRemove(child)
1349 except RPCFail, err:
1350 msgs.append(str(err))
1353 _Fail("; ".join(msgs))
1356 def _RecursiveAssembleBD(disk, owner, as_primary):
1357 """Activate a block device for an instance.
1359 This is run on the primary and secondary nodes for an instance.
1361 @note: this function is called recursively.
1363 @type disk: L{objects.Disk}
1364 @param disk: the disk we try to assemble
1366 @param owner: the name of the instance which owns the disk
1367 @type as_primary: boolean
1368 @param as_primary: if we should make the block device
1371 @return: the assembled device or None (in case no device
1373 @raise errors.BlockDeviceError: in case there is an error
1374 during the activation of the children or the device
1380 mcn = disk.ChildrenNeeded()
1382 mcn = 0 # max number of Nones allowed
1384 mcn = len(disk.children) - mcn # max number of Nones
1385 for chld_disk in disk.children:
1387 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1388 except errors.BlockDeviceError, err:
1389 if children.count(None) >= mcn:
1392 logging.error("Error in child activation (but continuing): %s",
1394 children.append(cdev)
1396 if as_primary or disk.AssembleOnSecondary():
1397 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1398 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1400 if as_primary or disk.OpenOnSecondary():
1402 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1403 as_primary, disk.iv_name)
1410 def BlockdevAssemble(disk, owner, as_primary):
1411 """Activate a block device for an instance.
1413 This is a wrapper over _RecursiveAssembleBD.
1415 @rtype: str or boolean
1416 @return: a C{/dev/...} path for primary nodes, and
1417 C{True} for secondary nodes
1421 result = _RecursiveAssembleBD(disk, owner, as_primary)
1422 if isinstance(result, bdev.BlockDev):
1423 # pylint: disable-msg=E1103
1424 result = result.dev_path
1425 except errors.BlockDeviceError, err:
1426 _Fail("Error while assembling disk: %s", err, exc=True)
1431 def BlockdevShutdown(disk):
1432 """Shut down a block device.
1434 First, if the device is assembled (Attach() is successful), then
1435 the device is shutdown. Then the children of the device are
1438 This function is called recursively. Note that we don't cache the
1439 children or such, as oppossed to assemble, shutdown of different
1440 devices doesn't require that the upper device was active.
1442 @type disk: L{objects.Disk}
1443 @param disk: the description of the disk we should
1449 r_dev = _RecursiveFindBD(disk)
1450 if r_dev is not None:
1451 r_path = r_dev.dev_path
1454 DevCacheManager.RemoveCache(r_path)
1455 except errors.BlockDeviceError, err:
1456 msgs.append(str(err))
1459 for child in disk.children:
1461 BlockdevShutdown(child)
1462 except RPCFail, err:
1463 msgs.append(str(err))
1466 _Fail("; ".join(msgs))
1469 def BlockdevAddchildren(parent_cdev, new_cdevs):
1470 """Extend a mirrored block device.
1472 @type parent_cdev: L{objects.Disk}
1473 @param parent_cdev: the disk to which we should add children
1474 @type new_cdevs: list of L{objects.Disk}
1475 @param new_cdevs: the list of children which we should add
1479 parent_bdev = _RecursiveFindBD(parent_cdev)
1480 if parent_bdev is None:
1481 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1482 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1483 if new_bdevs.count(None) > 0:
1484 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1485 parent_bdev.AddChildren(new_bdevs)
1488 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1489 """Shrink a mirrored block device.
1491 @type parent_cdev: L{objects.Disk}
1492 @param parent_cdev: the disk from which we should remove children
1493 @type new_cdevs: list of L{objects.Disk}
1494 @param new_cdevs: the list of children which we should remove
1498 parent_bdev = _RecursiveFindBD(parent_cdev)
1499 if parent_bdev is None:
1500 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1502 for disk in new_cdevs:
1503 rpath = disk.StaticDevPath()
1505 bd = _RecursiveFindBD(disk)
1507 _Fail("Can't find device %s while removing children", disk)
1509 devs.append(bd.dev_path)
1511 if not utils.IsNormAbsPath(rpath):
1512 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1514 parent_bdev.RemoveChildren(devs)
1517 def BlockdevGetmirrorstatus(disks):
1518 """Get the mirroring status of a list of devices.
1520 @type disks: list of L{objects.Disk}
1521 @param disks: the list of disks which we should query
1524 a list of (mirror_done, estimated_time) tuples, which
1525 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1526 @raise errors.BlockDeviceError: if any of the disks cannot be
1532 rbd = _RecursiveFindBD(dsk)
1534 _Fail("Can't find device %s", dsk)
1536 stats.append(rbd.CombinedSyncStatus())
1541 def _RecursiveFindBD(disk):
1542 """Check if a device is activated.
1544 If so, return information about the real device.
1546 @type disk: L{objects.Disk}
1547 @param disk: the disk object we need to find
1549 @return: None if the device can't be found,
1550 otherwise the device instance
1555 for chdisk in disk.children:
1556 children.append(_RecursiveFindBD(chdisk))
1558 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1561 def _OpenRealBD(disk):
1562 """Opens the underlying block device of a disk.
1564 @type disk: L{objects.Disk}
1565 @param disk: the disk object we want to open
1568 real_disk = _RecursiveFindBD(disk)
1569 if real_disk is None:
1570 _Fail("Block device '%s' is not set up", disk)
1577 def BlockdevFind(disk):
1578 """Check if a device is activated.
1580 If it is, return information about the real device.
1582 @type disk: L{objects.Disk}
1583 @param disk: the disk to find
1584 @rtype: None or objects.BlockDevStatus
1585 @return: None if the disk cannot be found, otherwise a the current
1590 rbd = _RecursiveFindBD(disk)
1591 except errors.BlockDeviceError, err:
1592 _Fail("Failed to find device: %s", err, exc=True)
1597 return rbd.GetSyncStatus()
1600 def BlockdevGetsize(disks):
1601 """Computes the size of the given disks.
1603 If a disk is not found, returns None instead.
1605 @type disks: list of L{objects.Disk}
1606 @param disks: the list of disk to compute the size for
1608 @return: list with elements None if the disk cannot be found,
1615 rbd = _RecursiveFindBD(cf)
1616 except errors.BlockDeviceError:
1622 result.append(rbd.GetActualSize())
1626 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1627 """Export a block device to a remote node.
1629 @type disk: L{objects.Disk}
1630 @param disk: the description of the disk to export
1631 @type dest_node: str
1632 @param dest_node: the destination node to export to
1633 @type dest_path: str
1634 @param dest_path: the destination path on the target node
1635 @type cluster_name: str
1636 @param cluster_name: the cluster name, needed for SSH hostalias
1640 real_disk = _OpenRealBD(disk)
1642 # the block size on the read dd is 1MiB to match our units
1643 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1644 "dd if=%s bs=1048576 count=%s",
1645 real_disk.dev_path, str(disk.size))
1647 # we set here a smaller block size as, due to ssh buffering, more
1648 # than 64-128k will mostly ignored; we use nocreat to fail if the
1649 # device is not already there or we pass a wrong path; we use
1650 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1651 # to not buffer too much memory; this means that at best, we flush
1652 # every 64k, which will not be very fast
1653 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1654 " oflag=dsync", dest_path)
1656 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1657 constants.GANETI_RUNAS,
1660 # all commands have been checked, so we're safe to combine them
1661 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1663 result = utils.RunCmd(["bash", "-c", command])
1666 _Fail("Disk copy command '%s' returned error: %s"
1667 " output: %s", command, result.fail_reason, result.output)
1670 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1671 """Write a file to the filesystem.
1673 This allows the master to overwrite(!) a file. It will only perform
1674 the operation if the file belongs to a list of configuration files.
1676 @type file_name: str
1677 @param file_name: the target file name
1679 @param data: the new contents of the file
1681 @param mode: the mode to give the file (can be None)
1683 @param uid: the owner of the file (can be -1 for default)
1685 @param gid: the group of the file (can be -1 for default)
1687 @param atime: the atime to set on the file (can be None)
1689 @param mtime: the mtime to set on the file (can be None)
1693 if not os.path.isabs(file_name):
1694 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1696 if file_name not in _ALLOWED_UPLOAD_FILES:
1697 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1700 raw_data = _Decompress(data)
1702 utils.SafeWriteFile(file_name, None,
1703 data=raw_data, mode=mode, uid=uid, gid=gid,
1704 atime=atime, mtime=mtime)
1707 def WriteSsconfFiles(values):
1708 """Update all ssconf files.
1710 Wrapper around the SimpleStore.WriteFiles.
1713 ssconf.SimpleStore().WriteFiles(values)
1716 def _ErrnoOrStr(err):
1717 """Format an EnvironmentError exception.
1719 If the L{err} argument has an errno attribute, it will be looked up
1720 and converted into a textual C{E...} description. Otherwise the
1721 string representation of the error will be returned.
1723 @type err: L{EnvironmentError}
1724 @param err: the exception to format
1727 if hasattr(err, 'errno'):
1728 detail = errno.errorcode[err.errno]
1734 def _OSOndiskAPIVersion(os_dir):
1735 """Compute and return the API version of a given OS.
1737 This function will try to read the API version of the OS residing in
1738 the 'os_dir' directory.
1741 @param os_dir: the directory in which we should look for the OS
1743 @return: tuple (status, data) with status denoting the validity and
1744 data holding either the vaid versions or an error message
1747 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1750 st = os.stat(api_file)
1751 except EnvironmentError, err:
1752 return False, ("Required file '%s' not found under path %s: %s" %
1753 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1755 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1756 return False, ("File '%s' in %s is not a regular file" %
1757 (constants.OS_API_FILE, os_dir))
1760 api_versions = utils.ReadFile(api_file).splitlines()
1761 except EnvironmentError, err:
1762 return False, ("Error while reading the API version file at %s: %s" %
1763 (api_file, _ErrnoOrStr(err)))
1766 api_versions = [int(version.strip()) for version in api_versions]
1767 except (TypeError, ValueError), err:
1768 return False, ("API version(s) can't be converted to integer: %s" %
1771 return True, api_versions
1774 def DiagnoseOS(top_dirs=None):
1775 """Compute the validity for all OSes.
1777 @type top_dirs: list
1778 @param top_dirs: the list of directories in which to
1779 search (if not given defaults to
1780 L{constants.OS_SEARCH_PATH})
1781 @rtype: list of L{objects.OS}
1782 @return: a list of tuples (name, path, status, diagnose, variants,
1783 parameters, api_version) for all (potential) OSes under all
1784 search paths, where:
1785 - name is the (potential) OS name
1786 - path is the full path to the OS
1787 - status True/False is the validity of the OS
1788 - diagnose is the error message for an invalid OS, otherwise empty
1789 - variants is a list of supported OS variants, if any
1790 - parameters is a list of (name, help) parameters, if any
1791 - api_version is a list of support OS API versions
1794 if top_dirs is None:
1795 top_dirs = constants.OS_SEARCH_PATH
1798 for dir_name in top_dirs:
1799 if os.path.isdir(dir_name):
1801 f_names = utils.ListVisibleFiles(dir_name)
1802 except EnvironmentError, err:
1803 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1805 for name in f_names:
1806 os_path = utils.PathJoin(dir_name, name)
1807 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1810 variants = os_inst.supported_variants
1811 parameters = os_inst.supported_parameters
1812 api_versions = os_inst.api_versions
1815 variants = parameters = api_versions = []
1816 result.append((name, os_path, status, diagnose, variants,
1817 parameters, api_versions))
1822 def _TryOSFromDisk(name, base_dir=None):
1823 """Create an OS instance from disk.
1825 This function will return an OS instance if the given name is a
1828 @type base_dir: string
1829 @keyword base_dir: Base directory containing OS installations.
1830 Defaults to a search in all the OS_SEARCH_PATH dirs.
1832 @return: success and either the OS instance if we find a valid one,
1836 if base_dir is None:
1837 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1839 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1842 return False, "Directory for OS %s not found in search path" % name
1844 status, api_versions = _OSOndiskAPIVersion(os_dir)
1847 return status, api_versions
1849 if not constants.OS_API_VERSIONS.intersection(api_versions):
1850 return False, ("API version mismatch for path '%s': found %s, want %s." %
1851 (os_dir, api_versions, constants.OS_API_VERSIONS))
1853 # OS Files dictionary, we will populate it with the absolute path names
1854 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1856 if max(api_versions) >= constants.OS_API_V15:
1857 os_files[constants.OS_VARIANTS_FILE] = ''
1859 if max(api_versions) >= constants.OS_API_V20:
1860 os_files[constants.OS_PARAMETERS_FILE] = ''
1862 del os_files[constants.OS_SCRIPT_VERIFY]
1864 for filename in os_files:
1865 os_files[filename] = utils.PathJoin(os_dir, filename)
1868 st = os.stat(os_files[filename])
1869 except EnvironmentError, err:
1870 return False, ("File '%s' under path '%s' is missing (%s)" %
1871 (filename, os_dir, _ErrnoOrStr(err)))
1873 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1874 return False, ("File '%s' under path '%s' is not a regular file" %
1877 if filename in constants.OS_SCRIPTS:
1878 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1879 return False, ("File '%s' under path '%s' is not executable" %
1883 if constants.OS_VARIANTS_FILE in os_files:
1884 variants_file = os_files[constants.OS_VARIANTS_FILE]
1886 variants = utils.ReadFile(variants_file).splitlines()
1887 except EnvironmentError, err:
1888 return False, ("Error while reading the OS variants file at %s: %s" %
1889 (variants_file, _ErrnoOrStr(err)))
1891 return False, ("No supported os variant found")
1894 if constants.OS_PARAMETERS_FILE in os_files:
1895 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1897 parameters = utils.ReadFile(parameters_file).splitlines()
1898 except EnvironmentError, err:
1899 return False, ("Error while reading the OS parameters file at %s: %s" %
1900 (parameters_file, _ErrnoOrStr(err)))
1901 parameters = [v.split(None, 1) for v in parameters]
1903 os_obj = objects.OS(name=name, path=os_dir,
1904 create_script=os_files[constants.OS_SCRIPT_CREATE],
1905 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1906 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1907 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1908 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1910 supported_variants=variants,
1911 supported_parameters=parameters,
1912 api_versions=api_versions)
1916 def OSFromDisk(name, base_dir=None):
1917 """Create an OS instance from disk.
1919 This function will return an OS instance if the given name is a
1920 valid OS name. Otherwise, it will raise an appropriate
1921 L{RPCFail} exception, detailing why this is not a valid OS.
1923 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1924 an exception but returns true/false status data.
1926 @type base_dir: string
1927 @keyword base_dir: Base directory containing OS installations.
1928 Defaults to a search in all the OS_SEARCH_PATH dirs.
1929 @rtype: L{objects.OS}
1930 @return: the OS instance if we find a valid one
1931 @raise RPCFail: if we don't find a valid OS
1934 name_only = objects.OS.GetName(name)
1935 status, payload = _TryOSFromDisk(name_only, base_dir)
1943 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1944 """Calculate the basic environment for an os script.
1947 @param os_name: full operating system name (including variant)
1948 @type inst_os: L{objects.OS}
1949 @param inst_os: operating system for which the environment is being built
1950 @type os_params: dict
1951 @param os_params: the OS parameters
1952 @type debug: integer
1953 @param debug: debug level (0 or 1, for OS Api 10)
1955 @return: dict of environment variables
1956 @raise errors.BlockDeviceError: if the block device
1962 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1963 result['OS_API_VERSION'] = '%d' % api_version
1964 result['OS_NAME'] = inst_os.name
1965 result['DEBUG_LEVEL'] = '%d' % debug
1968 if api_version >= constants.OS_API_V15:
1969 variant = objects.OS.GetVariant(os_name)
1971 variant = inst_os.supported_variants[0]
1972 result['OS_VARIANT'] = variant
1975 for pname, pvalue in os_params.items():
1976 result['OSP_%s' % pname.upper()] = pvalue
1981 def OSEnvironment(instance, inst_os, debug=0):
1982 """Calculate the environment for an os script.
1984 @type instance: L{objects.Instance}
1985 @param instance: target instance for the os script run
1986 @type inst_os: L{objects.OS}
1987 @param inst_os: operating system for which the environment is being built
1988 @type debug: integer
1989 @param debug: debug level (0 or 1, for OS Api 10)
1991 @return: dict of environment variables
1992 @raise errors.BlockDeviceError: if the block device
1996 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
1998 for attr in ["name", "os", "uuid", "ctime", "mtime"]:
1999 result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2001 result['HYPERVISOR'] = instance.hypervisor
2002 result['DISK_COUNT'] = '%d' % len(instance.disks)
2003 result['NIC_COUNT'] = '%d' % len(instance.nics)
2006 for idx, disk in enumerate(instance.disks):
2007 real_disk = _OpenRealBD(disk)
2008 result['DISK_%d_PATH' % idx] = real_disk.dev_path
2009 result['DISK_%d_ACCESS' % idx] = disk.mode
2010 if constants.HV_DISK_TYPE in instance.hvparams:
2011 result['DISK_%d_FRONTEND_TYPE' % idx] = \
2012 instance.hvparams[constants.HV_DISK_TYPE]
2013 if disk.dev_type in constants.LDS_BLOCK:
2014 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2015 elif disk.dev_type == constants.LD_FILE:
2016 result['DISK_%d_BACKEND_TYPE' % idx] = \
2017 'file:%s' % disk.physical_id[0]
2020 for idx, nic in enumerate(instance.nics):
2021 result['NIC_%d_MAC' % idx] = nic.mac
2023 result['NIC_%d_IP' % idx] = nic.ip
2024 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2025 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2026 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2027 if nic.nicparams[constants.NIC_LINK]:
2028 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2029 if constants.HV_NIC_TYPE in instance.hvparams:
2030 result['NIC_%d_FRONTEND_TYPE' % idx] = \
2031 instance.hvparams[constants.HV_NIC_TYPE]
2034 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2035 for key, value in source.items():
2036 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2041 def BlockdevGrow(disk, amount):
2042 """Grow a stack of block devices.
2044 This function is called recursively, with the childrens being the
2045 first ones to resize.
2047 @type disk: L{objects.Disk}
2048 @param disk: the disk to be grown
2049 @rtype: (status, result)
2050 @return: a tuple with the status of the operation
2051 (True/False), and the errors message if status
2055 r_dev = _RecursiveFindBD(disk)
2057 _Fail("Cannot find block device %s", disk)
2061 except errors.BlockDeviceError, err:
2062 _Fail("Failed to grow block device: %s", err, exc=True)
2065 def BlockdevSnapshot(disk):
2066 """Create a snapshot copy of a block device.
2068 This function is called recursively, and the snapshot is actually created
2069 just for the leaf lvm backend device.
2071 @type disk: L{objects.Disk}
2072 @param disk: the disk to be snapshotted
2074 @return: snapshot disk path
2077 if disk.dev_type == constants.LD_DRBD8:
2078 if not disk.children:
2079 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2081 return BlockdevSnapshot(disk.children[0])
2082 elif disk.dev_type == constants.LD_LV:
2083 r_dev = _RecursiveFindBD(disk)
2084 if r_dev is not None:
2085 # FIXME: choose a saner value for the snapshot size
2086 # let's stay on the safe side and ask for the full size, for now
2087 return r_dev.Snapshot(disk.size)
2089 _Fail("Cannot find block device %s", disk)
2091 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2092 disk.unique_id, disk.dev_type)
2095 def FinalizeExport(instance, snap_disks):
2096 """Write out the export configuration information.
2098 @type instance: L{objects.Instance}
2099 @param instance: the instance which we export, used for
2100 saving configuration
2101 @type snap_disks: list of L{objects.Disk}
2102 @param snap_disks: list of snapshot block devices, which
2103 will be used to get the actual name of the dump file
2108 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2109 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2111 config = objects.SerializableConfigParser()
2113 config.add_section(constants.INISECT_EXP)
2114 config.set(constants.INISECT_EXP, 'version', '0')
2115 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2116 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2117 config.set(constants.INISECT_EXP, 'os', instance.os)
2118 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2120 config.add_section(constants.INISECT_INS)
2121 config.set(constants.INISECT_INS, 'name', instance.name)
2122 config.set(constants.INISECT_INS, 'memory', '%d' %
2123 instance.beparams[constants.BE_MEMORY])
2124 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2125 instance.beparams[constants.BE_VCPUS])
2126 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2127 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2130 for nic_count, nic in enumerate(instance.nics):
2132 config.set(constants.INISECT_INS, 'nic%d_mac' %
2133 nic_count, '%s' % nic.mac)
2134 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2135 for param in constants.NICS_PARAMETER_TYPES:
2136 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2137 '%s' % nic.nicparams.get(param, None))
2138 # TODO: redundant: on load can read nics until it doesn't exist
2139 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2142 for disk_count, disk in enumerate(snap_disks):
2145 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2146 ('%s' % disk.iv_name))
2147 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2148 ('%s' % disk.physical_id[1]))
2149 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2152 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2154 # New-style hypervisor/backend parameters
2156 config.add_section(constants.INISECT_HYP)
2157 for name, value in instance.hvparams.items():
2158 if name not in constants.HVC_GLOBALS:
2159 config.set(constants.INISECT_HYP, name, str(value))
2161 config.add_section(constants.INISECT_BEP)
2162 for name, value in instance.beparams.items():
2163 config.set(constants.INISECT_BEP, name, str(value))
2165 config.add_section(constants.INISECT_OSP)
2166 for name, value in instance.osparams.items():
2167 config.set(constants.INISECT_OSP, name, str(value))
2169 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2170 data=config.Dumps())
2171 shutil.rmtree(finaldestdir, ignore_errors=True)
2172 shutil.move(destdir, finaldestdir)
2175 def ExportInfo(dest):
2176 """Get export configuration information.
2179 @param dest: directory containing the export
2181 @rtype: L{objects.SerializableConfigParser}
2182 @return: a serializable config file containing the
2186 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2188 config = objects.SerializableConfigParser()
2191 if (not config.has_section(constants.INISECT_EXP) or
2192 not config.has_section(constants.INISECT_INS)):
2193 _Fail("Export info file doesn't have the required fields")
2195 return config.Dumps()
2199 """Return a list of exports currently available on this machine.
2202 @return: list of the exports
2205 if os.path.isdir(constants.EXPORT_DIR):
2206 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2208 _Fail("No exports directory")
2211 def RemoveExport(export):
2212 """Remove an existing export from the node.
2215 @param export: the name of the export to remove
2219 target = utils.PathJoin(constants.EXPORT_DIR, export)
2222 shutil.rmtree(target)
2223 except EnvironmentError, err:
2224 _Fail("Error while removing the export: %s", err, exc=True)
2227 def BlockdevRename(devlist):
2228 """Rename a list of block devices.
2230 @type devlist: list of tuples
2231 @param devlist: list of tuples of the form (disk,
2232 new_logical_id, new_physical_id); disk is an
2233 L{objects.Disk} object describing the current disk,
2234 and new logical_id/physical_id is the name we
2237 @return: True if all renames succeeded, False otherwise
2242 for disk, unique_id in devlist:
2243 dev = _RecursiveFindBD(disk)
2245 msgs.append("Can't find device %s in rename" % str(disk))
2249 old_rpath = dev.dev_path
2250 dev.Rename(unique_id)
2251 new_rpath = dev.dev_path
2252 if old_rpath != new_rpath:
2253 DevCacheManager.RemoveCache(old_rpath)
2254 # FIXME: we should add the new cache information here, like:
2255 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2256 # but we don't have the owner here - maybe parse from existing
2257 # cache? for now, we only lose lvm data when we rename, which
2258 # is less critical than DRBD or MD
2259 except errors.BlockDeviceError, err:
2260 msgs.append("Can't rename device '%s' to '%s': %s" %
2261 (dev, unique_id, err))
2262 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2265 _Fail("; ".join(msgs))
2268 def _TransformFileStorageDir(file_storage_dir):
2269 """Checks whether given file_storage_dir is valid.
2271 Checks wheter the given file_storage_dir is within the cluster-wide
2272 default file_storage_dir stored in SimpleStore. Only paths under that
2273 directory are allowed.
2275 @type file_storage_dir: str
2276 @param file_storage_dir: the path to check
2278 @return: the normalized path if valid, None otherwise
2281 if not constants.ENABLE_FILE_STORAGE:
2282 _Fail("File storage disabled at configure time")
2284 file_storage_dir = os.path.normpath(file_storage_dir)
2285 base_file_storage_dir = cfg.GetFileStorageDir()
2286 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2287 base_file_storage_dir):
2288 _Fail("File storage directory '%s' is not under base file"
2289 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2290 return file_storage_dir
2293 def CreateFileStorageDir(file_storage_dir):
2294 """Create file storage directory.
2296 @type file_storage_dir: str
2297 @param file_storage_dir: directory to create
2300 @return: tuple with first element a boolean indicating wheter dir
2301 creation was successful or not
2304 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2305 if os.path.exists(file_storage_dir):
2306 if not os.path.isdir(file_storage_dir):
2307 _Fail("Specified storage dir '%s' is not a directory",
2311 os.makedirs(file_storage_dir, 0750)
2312 except OSError, err:
2313 _Fail("Cannot create file storage directory '%s': %s",
2314 file_storage_dir, err, exc=True)
2317 def RemoveFileStorageDir(file_storage_dir):
2318 """Remove file storage directory.
2320 Remove it only if it's empty. If not log an error and return.
2322 @type file_storage_dir: str
2323 @param file_storage_dir: the directory we should cleanup
2324 @rtype: tuple (success,)
2325 @return: tuple of one element, C{success}, denoting
2326 whether the operation was successful
2329 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2330 if os.path.exists(file_storage_dir):
2331 if not os.path.isdir(file_storage_dir):
2332 _Fail("Specified Storage directory '%s' is not a directory",
2334 # deletes dir only if empty, otherwise we want to fail the rpc call
2336 os.rmdir(file_storage_dir)
2337 except OSError, err:
2338 _Fail("Cannot remove file storage directory '%s': %s",
2339 file_storage_dir, err)
2342 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2343 """Rename the file storage directory.
2345 @type old_file_storage_dir: str
2346 @param old_file_storage_dir: the current path
2347 @type new_file_storage_dir: str
2348 @param new_file_storage_dir: the name we should rename to
2349 @rtype: tuple (success,)
2350 @return: tuple of one element, C{success}, denoting
2351 whether the operation was successful
2354 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2355 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2356 if not os.path.exists(new_file_storage_dir):
2357 if os.path.isdir(old_file_storage_dir):
2359 os.rename(old_file_storage_dir, new_file_storage_dir)
2360 except OSError, err:
2361 _Fail("Cannot rename '%s' to '%s': %s",
2362 old_file_storage_dir, new_file_storage_dir, err)
2364 _Fail("Specified storage dir '%s' is not a directory",
2365 old_file_storage_dir)
2367 if os.path.exists(old_file_storage_dir):
2368 _Fail("Cannot rename '%s' to '%s': both locations exist",
2369 old_file_storage_dir, new_file_storage_dir)
2372 def _EnsureJobQueueFile(file_name):
2373 """Checks whether the given filename is in the queue directory.
2375 @type file_name: str
2376 @param file_name: the file name we should check
2378 @raises RPCFail: if the file is not valid
2381 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2382 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2385 _Fail("Passed job queue file '%s' does not belong to"
2386 " the queue directory '%s'", file_name, queue_dir)
2389 def JobQueueUpdate(file_name, content):
2390 """Updates a file in the queue directory.
2392 This is just a wrapper over L{utils.WriteFile}, with proper
2395 @type file_name: str
2396 @param file_name: the job file name
2398 @param content: the new job contents
2400 @return: the success of the operation
2403 _EnsureJobQueueFile(file_name)
2404 getents = runtime.GetEnts()
2406 # Write and replace the file atomically
2407 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2408 gid=getents.masterd_gid)
2411 def JobQueueRename(old, new):
2412 """Renames a job queue file.
2414 This is just a wrapper over os.rename with proper checking.
2417 @param old: the old (actual) file name
2419 @param new: the desired file name
2421 @return: the success of the operation and payload
2424 _EnsureJobQueueFile(old)
2425 _EnsureJobQueueFile(new)
2427 utils.RenameFile(old, new, mkdir=True)
2430 def BlockdevClose(instance_name, disks):
2431 """Closes the given block devices.
2433 This means they will be switched to secondary mode (in case of
2436 @param instance_name: if the argument is not empty, the symlinks
2437 of this instance will be removed
2438 @type disks: list of L{objects.Disk}
2439 @param disks: the list of disks to be closed
2440 @rtype: tuple (success, message)
2441 @return: a tuple of success and message, where success
2442 indicates the succes of the operation, and message
2443 which will contain the error details in case we
2449 rd = _RecursiveFindBD(cf)
2451 _Fail("Can't find device %s", cf)
2458 except errors.BlockDeviceError, err:
2459 msg.append(str(err))
2461 _Fail("Can't make devices secondary: %s", ",".join(msg))
2464 _RemoveBlockDevLinks(instance_name, disks)
2467 def ValidateHVParams(hvname, hvparams):
2468 """Validates the given hypervisor parameters.
2470 @type hvname: string
2471 @param hvname: the hypervisor name
2472 @type hvparams: dict
2473 @param hvparams: the hypervisor parameters to be validated
2478 hv_type = hypervisor.GetHypervisor(hvname)
2479 hv_type.ValidateParameters(hvparams)
2480 except errors.HypervisorError, err:
2481 _Fail(str(err), log=False)
2484 def _CheckOSPList(os_obj, parameters):
2485 """Check whether a list of parameters is supported by the OS.
2487 @type os_obj: L{objects.OS}
2488 @param os_obj: OS object to check
2489 @type parameters: list
2490 @param parameters: the list of parameters to check
2493 supported = [v[0] for v in os_obj.supported_parameters]
2494 delta = frozenset(parameters).difference(supported)
2496 _Fail("The following parameters are not supported"
2497 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2500 def ValidateOS(required, osname, checks, osparams):
2501 """Validate the given OS' parameters.
2503 @type required: boolean
2504 @param required: whether absence of the OS should translate into
2506 @type osname: string
2507 @param osname: the OS to be validated
2509 @param checks: list of the checks to run (currently only 'parameters')
2510 @type osparams: dict
2511 @param osparams: dictionary with OS parameters
2513 @return: True if the validation passed, or False if the OS was not
2514 found and L{required} was false
2517 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2518 _Fail("Unknown checks required for OS %s: %s", osname,
2519 set(checks).difference(constants.OS_VALIDATE_CALLS))
2521 name_only = objects.OS.GetName(osname)
2522 status, tbv = _TryOSFromDisk(name_only, None)
2530 if max(tbv.api_versions) < constants.OS_API_V20:
2533 if constants.OS_VALIDATE_PARAMETERS in checks:
2534 _CheckOSPList(tbv, osparams.keys())
2536 validate_env = OSCoreEnv(osname, tbv, osparams)
2537 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2540 logging.error("os validate command '%s' returned error: %s output: %s",
2541 result.cmd, result.fail_reason, result.output)
2542 _Fail("OS validation script failed (%s), output: %s",
2543 result.fail_reason, result.output, log=False)
2549 """Demotes the current node from master candidate role.
2552 # try to ensure we're not the master by mistake
2553 master, myself = ssconf.GetMasterAndMyself()
2554 if master == myself:
2555 _Fail("ssconf status shows I'm the master node, will not demote")
2557 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2558 if not result.failed:
2559 _Fail("The master daemon is running, will not demote")
2562 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2563 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2564 except EnvironmentError, err:
2565 if err.errno != errno.ENOENT:
2566 _Fail("Error while backing up cluster file: %s", err, exc=True)
2568 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2571 def _GetX509Filenames(cryptodir, name):
2572 """Returns the full paths for the private key and certificate.
2575 return (utils.PathJoin(cryptodir, name),
2576 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2577 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2580 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2581 """Creates a new X509 certificate for SSL/TLS.
2584 @param validity: Validity in seconds
2585 @rtype: tuple; (string, string)
2586 @return: Certificate name and public part
2589 (key_pem, cert_pem) = \
2590 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2591 min(validity, _MAX_SSL_CERT_VALIDITY))
2593 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2594 prefix="x509-%s-" % utils.TimestampForFilename())
2596 name = os.path.basename(cert_dir)
2597 assert len(name) > 5
2599 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2601 utils.WriteFile(key_file, mode=0400, data=key_pem)
2602 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2604 # Never return private key as it shouldn't leave the node
2605 return (name, cert_pem)
2607 shutil.rmtree(cert_dir, ignore_errors=True)
2611 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2612 """Removes a X509 certificate.
2615 @param name: Certificate name
2618 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2620 utils.RemoveFile(key_file)
2621 utils.RemoveFile(cert_file)
2625 except EnvironmentError, err:
2626 _Fail("Cannot remove certificate directory '%s': %s",
2630 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2631 """Returns the command for the requested input/output.
2633 @type instance: L{objects.Instance}
2634 @param instance: The instance object
2635 @param mode: Import/export mode
2636 @param ieio: Input/output type
2637 @param ieargs: Input/output arguments
2640 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2647 if ieio == constants.IEIO_FILE:
2648 (filename, ) = ieargs
2650 if not utils.IsNormAbsPath(filename):
2651 _Fail("Path '%s' is not normalized or absolute", filename)
2653 directory = os.path.normpath(os.path.dirname(filename))
2655 if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2656 constants.EXPORT_DIR):
2657 _Fail("File '%s' is not under exports directory '%s'",
2658 filename, constants.EXPORT_DIR)
2661 utils.Makedirs(directory, mode=0750)
2663 quoted_filename = utils.ShellQuote(filename)
2665 if mode == constants.IEM_IMPORT:
2666 suffix = "> %s" % quoted_filename
2667 elif mode == constants.IEM_EXPORT:
2668 suffix = "< %s" % quoted_filename
2670 # Retrieve file size
2672 st = os.stat(filename)
2673 except EnvironmentError, err:
2674 logging.error("Can't stat(2) %s: %s", filename, err)
2676 exp_size = utils.BytesToMebibyte(st.st_size)
2678 elif ieio == constants.IEIO_RAW_DISK:
2681 real_disk = _OpenRealBD(disk)
2683 if mode == constants.IEM_IMPORT:
2684 # we set here a smaller block size as, due to transport buffering, more
2685 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2686 # is not already there or we pass a wrong path; we use notrunc to no
2687 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2688 # much memory; this means that at best, we flush every 64k, which will
2690 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2691 " bs=%s oflag=dsync"),
2695 elif mode == constants.IEM_EXPORT:
2696 # the block size on the read dd is 1MiB to match our units
2697 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2699 str(1024 * 1024), # 1 MB
2701 exp_size = disk.size
2703 elif ieio == constants.IEIO_SCRIPT:
2704 (disk, disk_index, ) = ieargs
2706 assert isinstance(disk_index, (int, long))
2708 real_disk = _OpenRealBD(disk)
2710 inst_os = OSFromDisk(instance.os)
2711 env = OSEnvironment(instance, inst_os)
2713 if mode == constants.IEM_IMPORT:
2714 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2715 env["IMPORT_INDEX"] = str(disk_index)
2716 script = inst_os.import_script
2718 elif mode == constants.IEM_EXPORT:
2719 env["EXPORT_DEVICE"] = real_disk.dev_path
2720 env["EXPORT_INDEX"] = str(disk_index)
2721 script = inst_os.export_script
2723 # TODO: Pass special environment only to script
2724 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2726 if mode == constants.IEM_IMPORT:
2727 suffix = "| %s" % script_cmd
2729 elif mode == constants.IEM_EXPORT:
2730 prefix = "%s |" % script_cmd
2732 # Let script predict size
2733 exp_size = constants.IE_CUSTOM_SIZE
2736 _Fail("Invalid %s I/O mode %r", mode, ieio)
2738 return (env, prefix, suffix, exp_size)
2741 def _CreateImportExportStatusDir(prefix):
2742 """Creates status directory for import/export.
2745 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2747 (prefix, utils.TimestampForFilename())))
2750 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2751 """Starts an import or export daemon.
2753 @param mode: Import/output mode
2754 @type opts: L{objects.ImportExportOptions}
2755 @param opts: Daemon options
2757 @param host: Remote host for export (None for import)
2759 @param port: Remote port for export (None for import)
2760 @type instance: L{objects.Instance}
2761 @param instance: Instance object
2762 @param ieio: Input/output type
2763 @param ieioargs: Input/output arguments
2766 if mode == constants.IEM_IMPORT:
2769 if not (host is None and port is None):
2770 _Fail("Can not specify host or port on import")
2772 elif mode == constants.IEM_EXPORT:
2775 if host is None or port is None:
2776 _Fail("Host and port must be specified for an export")
2779 _Fail("Invalid mode %r", mode)
2781 if (opts.key_name is None) ^ (opts.ca_pem is None):
2782 _Fail("Cluster certificate can only be used for both key and CA")
2784 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2785 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2787 if opts.key_name is None:
2789 key_path = constants.NODED_CERT_FILE
2790 cert_path = constants.NODED_CERT_FILE
2791 assert opts.ca_pem is None
2793 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2795 assert opts.ca_pem is not None
2797 for i in [key_path, cert_path]:
2798 if not os.path.exists(i):
2799 _Fail("File '%s' does not exist" % i)
2801 status_dir = _CreateImportExportStatusDir(prefix)
2803 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2804 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2805 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2807 if opts.ca_pem is None:
2809 ca = utils.ReadFile(constants.NODED_CERT_FILE)
2814 utils.WriteFile(ca_file, data=ca, mode=0400)
2817 constants.IMPORT_EXPORT_DAEMON,
2819 "--key=%s" % key_path,
2820 "--cert=%s" % cert_path,
2821 "--ca=%s" % ca_file,
2825 cmd.append("--host=%s" % host)
2828 cmd.append("--port=%s" % port)
2831 cmd.append("--compress=%s" % opts.compress)
2834 cmd.append("--magic=%s" % opts.magic)
2836 if exp_size is not None:
2837 cmd.append("--expected-size=%s" % exp_size)
2840 cmd.append("--cmd-prefix=%s" % cmd_prefix)
2843 cmd.append("--cmd-suffix=%s" % cmd_suffix)
2845 logfile = _InstanceLogName(prefix, instance.os, instance.name)
2847 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2848 # support for receiving a file descriptor for output
2849 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2852 # The import/export name is simply the status directory name
2853 return os.path.basename(status_dir)
2856 shutil.rmtree(status_dir, ignore_errors=True)
2860 def GetImportExportStatus(names):
2861 """Returns import/export daemon status.
2863 @type names: sequence
2864 @param names: List of names
2865 @rtype: List of dicts
2866 @return: Returns a list of the state of each named import/export or None if a
2867 status couldn't be read
2873 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2877 data = utils.ReadFile(status_file)
2878 except EnvironmentError, err:
2879 if err.errno != errno.ENOENT:
2887 result.append(serializer.LoadJson(data))
2892 def AbortImportExport(name):
2893 """Sends SIGTERM to a running import/export daemon.
2896 logging.info("Abort import/export %s", name)
2898 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2899 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2902 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2904 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2907 def CleanupImportExport(name):
2908 """Cleanup after an import or export.
2910 If the import/export daemon is still running it's killed. Afterwards the
2911 whole status directory is removed.
2914 logging.info("Finalizing import/export %s", name)
2916 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2918 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2921 logging.info("Import/export %s is still running with PID %s",
2923 utils.KillProcess(pid, waitpid=False)
2925 shutil.rmtree(status_dir, ignore_errors=True)
2928 def _FindDisks(nodes_ip, disks):
2929 """Sets the physical ID on disks and returns the block devices.
2932 # set the correct physical ID
2933 my_name = netutils.Hostname.GetSysName()
2935 cf.SetPhysicalID(my_name, nodes_ip)
2940 rd = _RecursiveFindBD(cf)
2942 _Fail("Can't find device %s", cf)
2947 def DrbdDisconnectNet(nodes_ip, disks):
2948 """Disconnects the network on a list of drbd devices.
2951 bdevs = _FindDisks(nodes_ip, disks)
2957 except errors.BlockDeviceError, err:
2958 _Fail("Can't change network configuration to standalone mode: %s",
2962 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2963 """Attaches the network on a list of drbd devices.
2966 bdevs = _FindDisks(nodes_ip, disks)
2969 for idx, rd in enumerate(bdevs):
2971 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2972 except EnvironmentError, err:
2973 _Fail("Can't create symlink: %s", err)
2974 # reconnect disks, switch to new master configuration and if
2975 # needed primary mode
2978 rd.AttachNet(multimaster)
2979 except errors.BlockDeviceError, err:
2980 _Fail("Can't change network configuration: %s", err)
2982 # wait until the disks are connected; we need to retry the re-attach
2983 # if the device becomes standalone, as this might happen if the one
2984 # node disconnects and reconnects in a different mode before the
2985 # other node reconnects; in this case, one or both of the nodes will
2986 # decide it has wrong configuration and switch to standalone
2989 all_connected = True
2992 stats = rd.GetProcStatus()
2994 all_connected = (all_connected and
2995 (stats.is_connected or stats.is_in_resync))
2997 if stats.is_standalone:
2998 # peer had different config info and this node became
2999 # standalone, even though this should not happen with the
3000 # new staged way of changing disk configs
3002 rd.AttachNet(multimaster)
3003 except errors.BlockDeviceError, err:
3004 _Fail("Can't change network configuration: %s", err)
3006 if not all_connected:
3007 raise utils.RetryAgain()
3010 # Start with a delay of 100 miliseconds and go up to 5 seconds
3011 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3012 except utils.RetryTimeout:
3013 _Fail("Timeout in disk reconnecting")
3016 # change to primary mode
3020 except errors.BlockDeviceError, err:
3021 _Fail("Can't change to primary mode: %s", err)
3024 def DrbdWaitSync(nodes_ip, disks):
3025 """Wait until DRBDs have synchronized.
3029 stats = rd.GetProcStatus()
3030 if not (stats.is_connected or stats.is_in_resync):
3031 raise utils.RetryAgain()
3034 bdevs = _FindDisks(nodes_ip, disks)
3040 # poll each second for 15 seconds
3041 stats = utils.Retry(_helper, 1, 15, args=[rd])
3042 except utils.RetryTimeout:
3043 stats = rd.GetProcStatus()
3045 if not (stats.is_connected or stats.is_in_resync):
3046 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3047 alldone = alldone and (not stats.is_in_resync)
3048 if stats.sync_percent is not None:
3049 min_resync = min(min_resync, stats.sync_percent)
3051 return (alldone, min_resync)
3054 def GetDrbdUsermodeHelper():
3055 """Returns DRBD usermode helper currently configured.
3059 return bdev.BaseDRBD.GetUsermodeHelper()
3060 except errors.BlockDeviceError, err:
3064 def PowercycleNode(hypervisor_type):
3065 """Hard-powercycle the node.
3067 Because we need to return first, and schedule the powercycle in the
3068 background, we won't be able to report failures nicely.
3071 hyper = hypervisor.GetHypervisor(hypervisor_type)
3075 # if we can't fork, we'll pretend that we're in the child process
3078 return "Reboot scheduled in 5 seconds"
3079 # ensure the child is running on ram
3082 except Exception: # pylint: disable-msg=W0703
3085 hyper.PowercycleNode()
3088 class HooksRunner(object):
3091 This class is instantiated on the node side (ganeti-noded) and not
3095 def __init__(self, hooks_base_dir=None):
3096 """Constructor for hooks runner.
3098 @type hooks_base_dir: str or None
3099 @param hooks_base_dir: if not None, this overrides the
3100 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3103 if hooks_base_dir is None:
3104 hooks_base_dir = constants.HOOKS_BASE_DIR
3105 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3107 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3109 def RunHooks(self, hpath, phase, env):
3110 """Run the scripts in the hooks directory.
3113 @param hpath: the path to the hooks directory which
3116 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3117 L{constants.HOOKS_PHASE_POST}
3119 @param env: dictionary with the environment for the hook
3121 @return: list of 3-element tuples:
3123 - script result, either L{constants.HKR_SUCCESS} or
3124 L{constants.HKR_FAIL}
3125 - output of the script
3127 @raise errors.ProgrammerError: for invalid input
3131 if phase == constants.HOOKS_PHASE_PRE:
3133 elif phase == constants.HOOKS_PHASE_POST:
3136 _Fail("Unknown hooks phase '%s'", phase)
3139 subdir = "%s-%s.d" % (hpath, suffix)
3140 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3144 if not os.path.isdir(dir_name):
3145 # for non-existing/non-dirs, we simply exit instead of logging a
3146 # warning at every operation
3149 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3151 for (relname, relstatus, runresult) in runparts_results:
3152 if relstatus == constants.RUNPARTS_SKIP:
3153 rrval = constants.HKR_SKIP
3155 elif relstatus == constants.RUNPARTS_ERR:
3156 rrval = constants.HKR_FAIL
3157 output = "Hook script execution error: %s" % runresult
3158 elif relstatus == constants.RUNPARTS_RUN:
3159 if runresult.failed:
3160 rrval = constants.HKR_FAIL
3162 rrval = constants.HKR_SUCCESS
3163 output = utils.SafeEncode(runresult.output.strip())
3164 results.append(("%s/%s" % (subdir, relname), rrval, output))
3169 class IAllocatorRunner(object):
3170 """IAllocator runner.
3172 This class is instantiated on the node side (ganeti-noded) and not on
3177 def Run(name, idata):
3178 """Run an iallocator script.
3181 @param name: the iallocator script name
3183 @param idata: the allocator input data
3186 @return: two element tuple of:
3188 - either error message or stdout of allocator (for success)
3191 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3193 if alloc_script is None:
3194 _Fail("iallocator module '%s' not found in the search path", name)
3196 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3200 result = utils.RunCmd([alloc_script, fin_name])
3202 _Fail("iallocator module '%s' failed: %s, output '%s'",
3203 name, result.fail_reason, result.output)
3207 return result.stdout
3210 class DevCacheManager(object):
3211 """Simple class for managing a cache of block device information.
3214 _DEV_PREFIX = "/dev/"
3215 _ROOT_DIR = constants.BDEV_CACHE_DIR
3218 def _ConvertPath(cls, dev_path):
3219 """Converts a /dev/name path to the cache file name.
3221 This replaces slashes with underscores and strips the /dev
3222 prefix. It then returns the full path to the cache file.
3225 @param dev_path: the C{/dev/} path name
3227 @return: the converted path name
3230 if dev_path.startswith(cls._DEV_PREFIX):
3231 dev_path = dev_path[len(cls._DEV_PREFIX):]
3232 dev_path = dev_path.replace("/", "_")
3233 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3237 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3238 """Updates the cache information for a given device.
3241 @param dev_path: the pathname of the device
3243 @param owner: the owner (instance name) of the device
3244 @type on_primary: bool
3245 @param on_primary: whether this is the primary
3248 @param iv_name: the instance-visible name of the
3249 device, as in objects.Disk.iv_name
3254 if dev_path is None:
3255 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3257 fpath = cls._ConvertPath(dev_path)
3263 iv_name = "not_visible"
3264 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3266 utils.WriteFile(fpath, data=fdata)
3267 except EnvironmentError, err:
3268 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3271 def RemoveCache(cls, dev_path):
3272 """Remove data for a dev_path.
3274 This is just a wrapper over L{utils.RemoveFile} with a converted
3275 path name and logging.
3278 @param dev_path: the pathname of the device
3283 if dev_path is None:
3284 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3286 fpath = cls._ConvertPath(dev_path)
3288 utils.RemoveFile(fpath)
3289 except EnvironmentError, err:
3290 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)