4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
68 constants.JOB_QUEUE_ARCHIVE_DIR,
70 constants.CRYPTO_KEYS_DIR,
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
80 class RPCFail(Exception):
81 """Class denoting RPC failure.
83 Its argument is the error message.
88 def _Fail(msg, *args, **kwargs):
89 """Log an error and the raise an RPCFail exception.
91 This exception is then handled specially in the ganeti daemon and
92 turned into a 'failed' return type. As such, this function is a
93 useful shortcut for logging the error and returning it to the master
97 @param msg: the text of the exception
103 if "log" not in kwargs or kwargs["log"]: # if we should log this error
104 if "exc" in kwargs and kwargs["exc"]:
105 logging.exception(msg)
112 """Simple wrapper to return a SimpleStore.
114 @rtype: L{ssconf.SimpleStore}
115 @return: a SimpleStore instance
118 return ssconf.SimpleStore()
121 def _GetSshRunner(cluster_name):
122 """Simple wrapper to return an SshRunner.
124 @type cluster_name: str
125 @param cluster_name: the cluster name, which is needed
126 by the SshRunner constructor
127 @rtype: L{ssh.SshRunner}
128 @return: an SshRunner instance
131 return ssh.SshRunner(cluster_name)
134 def _Decompress(data):
135 """Unpacks data compressed by the RPC client.
137 @type data: list or tuple
138 @param data: Data sent by RPC client
140 @return: Decompressed data
143 assert isinstance(data, (list, tuple))
144 assert len(data) == 2
145 (encoding, content) = data
146 if encoding == constants.RPC_ENCODING_NONE:
148 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149 return zlib.decompress(base64.b64decode(content))
151 raise AssertionError("Unknown data encoding")
154 def _CleanDirectory(path, exclude=None):
155 """Removes all regular files in a directory.
158 @param path: the directory to clean
160 @param exclude: list of files to be excluded, defaults
164 if path not in _ALLOWED_CLEAN_DIRS:
165 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
168 if not os.path.isdir(path):
173 # Normalize excluded paths
174 exclude = [os.path.normpath(i) for i in exclude]
176 for rel_name in utils.ListVisibleFiles(path):
177 full_name = utils.PathJoin(path, rel_name)
178 if full_name in exclude:
180 if os.path.isfile(full_name) and not os.path.islink(full_name):
181 utils.RemoveFile(full_name)
184 def _BuildUploadFileList():
185 """Build the list of allowed upload files.
187 This is abstracted so that it's built only once at module import time.
190 allowed_files = set([
191 constants.CLUSTER_CONF_FILE,
193 constants.SSH_KNOWN_HOSTS_FILE,
194 constants.VNC_PASSWORD_FILE,
195 constants.RAPI_CERT_FILE,
196 constants.RAPI_USERS_FILE,
197 constants.CONFD_HMAC_KEY,
198 constants.CLUSTER_DOMAIN_SECRET_FILE,
201 for hv_name in constants.HYPER_TYPES:
202 hv_class = hypervisor.GetHypervisorClass(hv_name)
203 allowed_files.update(hv_class.GetAncillaryFiles())
205 return frozenset(allowed_files)
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212 """Removes job queue files and archived jobs.
218 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223 """Returns master information.
225 This is an utility function to compute master information, either
226 for consumption here or from the node daemon.
229 @return: master_netdev, master_ip, master_name, primary_ip_family
230 @raise RPCFail: in case of errors
235 master_netdev = cfg.GetMasterNetdev()
236 master_ip = cfg.GetMasterIP()
237 master_node = cfg.GetMasterNode()
238 primary_ip_family = cfg.GetPrimaryIPFamily()
239 except errors.ConfigurationError, err:
240 _Fail("Cluster configuration incomplete: %s", err, exc=True)
241 return (master_netdev, master_ip, master_node, primary_ip_family)
244 def StartMaster(start_daemons, no_voting):
245 """Activate local node as master node.
247 The function will either try activate the IP address of the master
248 (unless someone else has it) or also start the master daemons, based
249 on the start_daemons parameter.
251 @type start_daemons: boolean
252 @param start_daemons: whether to start the master daemons
253 (ganeti-masterd and ganeti-rapi), or (if false) activate the
255 @type no_voting: boolean
256 @param no_voting: whether to start ganeti-masterd without a node vote
257 (if start_daemons is True), but still non-interactively
261 # GetMasterInfo will raise an exception if not able to return data
262 master_netdev, master_ip, _, family = GetMasterInfo()
265 # either start the master and rapi daemons
268 masterd_args = "--no-voting --yes-do-it"
273 "EXTRA_MASTERD_ARGS": masterd_args,
276 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
278 msg = "Can't start Ganeti master: %s" % result.output
283 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284 if netutils.IPAddress.Own(master_ip):
285 # we already have the ip:
286 logging.debug("Master IP already configured, doing nothing")
288 msg = "Someone else has the master ip, not activating"
292 ipcls = netutils.IP4Address
293 if family == netutils.IP6Address.family:
294 ipcls = netutils.IP6Address
296 result = utils.RunCmd(["ip", "address", "add",
297 "%s/%d" % (master_ip, ipcls.iplen),
298 "dev", master_netdev, "label",
299 "%s:0" % master_netdev])
301 msg = "Can't activate master IP: %s" % result.output
305 # we ignore the exit code of the following cmds
306 if ipcls == netutils.IP4Address:
307 utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308 master_ip, master_ip])
309 elif ipcls == netutils.IP6Address:
311 utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312 except errors.OpExecError:
313 # TODO: Better error reporting
314 logging.warning("Can't execute ndisc6, please install if missing")
317 _Fail("; ".join(err_msgs))
320 def StopMaster(stop_daemons):
321 """Deactivate this node as master.
323 The function will always try to deactivate the IP address of the
324 master. It will also stop the master daemons depending on the
325 stop_daemons parameter.
327 @type stop_daemons: boolean
328 @param stop_daemons: whether to also stop the master daemons
329 (ganeti-masterd and ganeti-rapi)
333 # TODO: log and report back to the caller the error failures; we
334 # need to decide in which case we fail the RPC for this
336 # GetMasterInfo will raise an exception if not able to return data
337 master_netdev, master_ip, _, family = GetMasterInfo()
339 ipcls = netutils.IP4Address
340 if family == netutils.IP6Address.family:
341 ipcls = netutils.IP6Address
343 result = utils.RunCmd(["ip", "address", "del",
344 "%s/%d" % (master_ip, ipcls.iplen),
345 "dev", master_netdev])
347 logging.error("Can't remove the master IP, error: %s", result.output)
348 # but otherwise ignore the failure
351 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
353 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
355 result.cmd, result.exit_code, result.output)
358 def EtcHostsModify(mode, host, ip):
359 """Modify a host entry in /etc/hosts.
361 @param mode: The mode to operate. Either add or remove entry
362 @param host: The host to operate on
363 @param ip: The ip associated with the entry
366 if mode == constants.ETC_HOSTS_ADD:
368 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
370 utils.AddHostToEtcHosts(host, ip)
371 elif mode == constants.ETC_HOSTS_REMOVE:
373 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374 " parameter is present")
375 utils.RemoveHostFromEtcHosts(host)
377 RPCFail("Mode not supported")
380 def LeaveCluster(modify_ssh_setup):
381 """Cleans up and remove the current node.
383 This function cleans up and prepares the current node to be removed
386 If processing is successful, then it raises an
387 L{errors.QuitGanetiException} which is used as a special case to
388 shutdown the node daemon.
390 @param modify_ssh_setup: boolean
393 _CleanDirectory(constants.DATA_DIR)
394 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
399 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
401 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
403 utils.RemoveFile(priv_key)
404 utils.RemoveFile(pub_key)
405 except errors.OpExecError:
406 logging.exception("Error while processing ssh files")
409 utils.RemoveFile(constants.CONFD_HMAC_KEY)
410 utils.RemoveFile(constants.RAPI_CERT_FILE)
411 utils.RemoveFile(constants.NODED_CERT_FILE)
412 except: # pylint: disable-msg=W0702
413 logging.exception("Error while removing cluster secrets")
415 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
417 logging.error("Command %s failed with exitcode %s and error %s",
418 result.cmd, result.exit_code, result.output)
420 # Raise a custom exception (handled in ganeti-noded)
421 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
424 def GetNodeInfo(vgname, hypervisor_type):
425 """Gives back a hash with different information about the node.
427 @type vgname: C{string}
428 @param vgname: the name of the volume group to ask for disk space information
429 @type hypervisor_type: C{str}
430 @param hypervisor_type: the name of the hypervisor to ask for
433 @return: dictionary with the following keys:
434 - vg_size is the size of the configured volume group in MiB
435 - vg_free is the free size of the volume group in MiB
436 - memory_dom0 is the memory allocated for domain0 in MiB
437 - memory_free is the currently available (free) ram in MiB
438 - memory_total is the total number of ram in MiB
443 vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444 vg_free = vg_size = None
446 vg_free = int(round(vginfo[0][0], 0))
447 vg_size = int(round(vginfo[0][1], 0))
449 outputarray['vg_size'] = vg_size
450 outputarray['vg_free'] = vg_free
452 hyper = hypervisor.GetHypervisor(hypervisor_type)
453 hyp_info = hyper.GetNodeInfo()
454 if hyp_info is not None:
455 outputarray.update(hyp_info)
457 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462 def VerifyNode(what, cluster_name):
463 """Verify the status of the local node.
465 Based on the input L{what} parameter, various checks are done on the
468 If the I{filelist} key is present, this list of
469 files is checksummed and the file/checksum pairs are returned.
471 If the I{nodelist} key is present, we check that we have
472 connectivity via ssh with the target nodes (and check the hostname
475 If the I{node-net-test} key is present, we check that we have
476 connectivity to the given nodes via both primary IP and, if
477 applicable, secondary IPs.
480 @param what: a dictionary of things to check:
481 - filelist: list of files for which to compute checksums
482 - nodelist: list of nodes we should check ssh communication with
483 - node-net-test: list of nodes we should check node daemon port
485 - hypervisor: list with hypervisors to run the verify for
487 @return: a dictionary with the same keys as the input dict, and
488 values representing the result of the checks
492 my_name = netutils.Hostname.GetSysName()
493 port = netutils.GetDaemonPort(constants.NODED)
495 if constants.NV_HYPERVISOR in what:
496 result[constants.NV_HYPERVISOR] = tmp = {}
497 for hv_name in what[constants.NV_HYPERVISOR]:
499 val = hypervisor.GetHypervisor(hv_name).Verify()
500 except errors.HypervisorError, err:
501 val = "Error while checking hypervisor: %s" % str(err)
504 if constants.NV_FILELIST in what:
505 result[constants.NV_FILELIST] = utils.FingerprintFiles(
506 what[constants.NV_FILELIST])
508 if constants.NV_NODELIST in what:
509 result[constants.NV_NODELIST] = tmp = {}
510 random.shuffle(what[constants.NV_NODELIST])
511 for node in what[constants.NV_NODELIST]:
512 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
516 if constants.NV_NODENETTEST in what:
517 result[constants.NV_NODENETTEST] = tmp = {}
518 my_pip = my_sip = None
519 for name, pip, sip in what[constants.NV_NODENETTEST]:
525 tmp[my_name] = ("Can't find my own primary/secondary IP"
528 for name, pip, sip in what[constants.NV_NODENETTEST]:
530 if not netutils.TcpPing(pip, port, source=my_pip):
531 fail.append("primary")
533 if not netutils.TcpPing(sip, port, source=my_sip):
534 fail.append("secondary")
536 tmp[name] = ("failure using the %s interface(s)" %
539 if constants.NV_MASTERIP in what:
540 # FIXME: add checks on incoming data structures (here and in the
541 # rest of the function)
542 master_name, master_ip = what[constants.NV_MASTERIP]
543 if master_name == my_name:
544 source = constants.IP4_ADDRESS_LOCALHOST
547 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
550 if constants.NV_LVLIST in what:
552 val = GetVolumeList(what[constants.NV_LVLIST])
555 result[constants.NV_LVLIST] = val
557 if constants.NV_INSTANCELIST in what:
558 # GetInstanceList can fail
560 val = GetInstanceList(what[constants.NV_INSTANCELIST])
563 result[constants.NV_INSTANCELIST] = val
565 if constants.NV_VGLIST in what:
566 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
568 if constants.NV_PVLIST in what:
569 result[constants.NV_PVLIST] = \
570 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
571 filter_allocatable=False)
573 if constants.NV_VERSION in what:
574 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
575 constants.RELEASE_VERSION)
577 if constants.NV_HVINFO in what:
578 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
579 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
581 if constants.NV_DRBDLIST in what:
583 used_minors = bdev.DRBD8.GetUsedDevs().keys()
584 except errors.BlockDeviceError, err:
585 logging.warning("Can't get used minors list", exc_info=True)
586 used_minors = str(err)
587 result[constants.NV_DRBDLIST] = used_minors
589 if constants.NV_DRBDHELPER in what:
592 payload = bdev.BaseDRBD.GetUsermodeHelper()
593 except errors.BlockDeviceError, err:
594 logging.error("Can't get DRBD usermode helper: %s", str(err))
597 result[constants.NV_DRBDHELPER] = (status, payload)
599 if constants.NV_NODESETUP in what:
600 result[constants.NV_NODESETUP] = tmpr = []
601 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
602 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
603 " under /sys, missing required directories /sys/block"
604 " and /sys/class/net")
605 if (not os.path.isdir("/proc/sys") or
606 not os.path.isfile("/proc/sysrq-trigger")):
607 tmpr.append("The procfs filesystem doesn't seem to be mounted"
608 " under /proc, missing required directory /proc/sys and"
609 " the file /proc/sysrq-trigger")
611 if constants.NV_TIME in what:
612 result[constants.NV_TIME] = utils.SplitTime(time.time())
614 if constants.NV_OSLIST in what:
615 result[constants.NV_OSLIST] = DiagnoseOS()
620 def GetVolumeList(vg_name):
621 """Compute list of logical volumes and their size.
624 @param vg_name: the volume group whose LVs we should list
627 dictionary of all partions (key) with value being a tuple of
628 their size (in MiB), inactive and online status::
630 {'test1': ('20.06', True, True)}
632 in case of errors, a string is returned with the error
638 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
639 "--separator=%s" % sep,
640 "-olv_name,lv_size,lv_attr", vg_name])
642 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
644 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
645 for line in result.stdout.splitlines():
647 match = valid_line_re.match(line)
649 logging.error("Invalid line returned from lvs output: '%s'", line)
651 name, size, attr = match.groups()
652 inactive = attr[4] == '-'
653 online = attr[5] == 'o'
654 virtual = attr[0] == 'v'
656 # we don't want to report such volumes as existing, since they
657 # don't really hold data
659 lvs[name] = (size, inactive, online)
664 def ListVolumeGroups():
665 """List the volume groups and their size.
668 @return: dictionary with keys volume name and values the
672 return utils.ListVolumeGroups()
676 """List all volumes on this node.
680 A list of dictionaries, each having four keys:
681 - name: the logical volume name,
682 - size: the size of the logical volume
683 - dev: the physical device on which the LV lives
684 - vg: the volume group to which it belongs
686 In case of errors, we return an empty list and log the
689 Note that since a logical volume can live on multiple physical
690 volumes, the resulting list might include a logical volume
694 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
696 "--options=lv_name,lv_size,devices,vg_name"])
698 _Fail("Failed to list logical volumes, lvs output: %s",
702 return dev.split('(')[0]
705 return [parse_dev(x) for x in dev.split(",")]
708 line = [v.strip() for v in line]
709 return [{'name': line[0], 'size': line[1],
710 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
713 for line in result.stdout.splitlines():
714 if line.count('|') >= 3:
715 all_devs.extend(map_line(line.split('|')))
717 logging.warning("Strange line in the output from lvs: '%s'", line)
721 def BridgesExist(bridges_list):
722 """Check if a list of bridges exist on the current node.
725 @return: C{True} if all of them exist, C{False} otherwise
729 for bridge in bridges_list:
730 if not utils.BridgeExists(bridge):
731 missing.append(bridge)
734 _Fail("Missing bridges %s", utils.CommaJoin(missing))
737 def GetInstanceList(hypervisor_list):
738 """Provides a list of instances.
740 @type hypervisor_list: list
741 @param hypervisor_list: the list of hypervisors to query information
744 @return: a list of all running instances on the current node
745 - instance1.example.com
746 - instance2.example.com
750 for hname in hypervisor_list:
752 names = hypervisor.GetHypervisor(hname).ListInstances()
753 results.extend(names)
754 except errors.HypervisorError, err:
755 _Fail("Error enumerating instances (hypervisor %s): %s",
756 hname, err, exc=True)
761 def GetInstanceInfo(instance, hname):
762 """Gives back the information about an instance as a dictionary.
764 @type instance: string
765 @param instance: the instance name
767 @param hname: the hypervisor type of the instance
770 @return: dictionary with the following keys:
771 - memory: memory size of instance (int)
772 - state: xen state of instance (string)
773 - time: cpu time of instance (float)
778 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
779 if iinfo is not None:
780 output['memory'] = iinfo[2]
781 output['state'] = iinfo[4]
782 output['time'] = iinfo[5]
787 def GetInstanceMigratable(instance):
788 """Gives whether an instance can be migrated.
790 @type instance: L{objects.Instance}
791 @param instance: object representing the instance to be checked.
794 @return: tuple of (result, description) where:
795 - result: whether the instance can be migrated or not
796 - description: a description of the issue, if relevant
799 hyper = hypervisor.GetHypervisor(instance.hypervisor)
800 iname = instance.name
801 if iname not in hyper.ListInstances():
802 _Fail("Instance %s is not running", iname)
804 for idx in range(len(instance.disks)):
805 link_name = _GetBlockDevSymlinkPath(iname, idx)
806 if not os.path.islink(link_name):
807 logging.warning("Instance %s is missing symlink %s for disk %d",
808 iname, link_name, idx)
811 def GetAllInstancesInfo(hypervisor_list):
812 """Gather data about all instances.
814 This is the equivalent of L{GetInstanceInfo}, except that it
815 computes data for all instances at once, thus being faster if one
816 needs data about more than one instance.
818 @type hypervisor_list: list
819 @param hypervisor_list: list of hypervisors to query for instance data
822 @return: dictionary of instance: data, with data having the following keys:
823 - memory: memory size of instance (int)
824 - state: xen state of instance (string)
825 - time: cpu time of instance (float)
826 - vcpus: the number of vcpus
831 for hname in hypervisor_list:
832 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
834 for name, _, memory, vcpus, state, times in iinfo:
842 # we only check static parameters, like memory and vcpus,
843 # and not state and time which can change between the
844 # invocations of the different hypervisors
845 for key in 'memory', 'vcpus':
846 if value[key] != output[name][key]:
847 _Fail("Instance %s is running twice"
848 " with different parameters", name)
854 def _InstanceLogName(kind, os_name, instance):
855 """Compute the OS log filename for a given instance and operation.
857 The instance name and os name are passed in as strings since not all
858 operations have these as part of an instance object.
861 @param kind: the operation type (e.g. add, import, etc.)
862 @type os_name: string
863 @param os_name: the os name
864 @type instance: string
865 @param instance: the name of the instance being imported/added/etc.
868 # TODO: Use tempfile.mkstemp to create unique filename
869 base = ("%s-%s-%s-%s.log" %
870 (kind, os_name, instance, utils.TimestampForFilename()))
871 return utils.PathJoin(constants.LOG_OS_DIR, base)
874 def InstanceOsAdd(instance, reinstall, debug):
875 """Add an OS to an instance.
877 @type instance: L{objects.Instance}
878 @param instance: Instance whose OS is to be installed
879 @type reinstall: boolean
880 @param reinstall: whether this is an instance reinstall
882 @param debug: debug level, passed to the OS scripts
886 inst_os = OSFromDisk(instance.os)
888 create_env = OSEnvironment(instance, inst_os, debug)
890 create_env['INSTANCE_REINSTALL'] = "1"
892 logfile = _InstanceLogName("add", instance.os, instance.name)
894 result = utils.RunCmd([inst_os.create_script], env=create_env,
895 cwd=inst_os.path, output=logfile,)
897 logging.error("os create command '%s' returned error: %s, logfile: %s,"
898 " output: %s", result.cmd, result.fail_reason, logfile,
900 lines = [utils.SafeEncode(val)
901 for val in utils.TailFile(logfile, lines=20)]
902 _Fail("OS create script failed (%s), last lines in the"
903 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
906 def RunRenameInstance(instance, old_name, debug):
907 """Run the OS rename script for an instance.
909 @type instance: L{objects.Instance}
910 @param instance: Instance whose OS is to be installed
911 @type old_name: string
912 @param old_name: previous instance name
914 @param debug: debug level, passed to the OS scripts
916 @return: the success of the operation
919 inst_os = OSFromDisk(instance.os)
921 rename_env = OSEnvironment(instance, inst_os, debug)
922 rename_env['OLD_INSTANCE_NAME'] = old_name
924 logfile = _InstanceLogName("rename", instance.os,
925 "%s-%s" % (old_name, instance.name))
927 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928 cwd=inst_os.path, output=logfile)
931 logging.error("os create command '%s' returned error: %s output: %s",
932 result.cmd, result.fail_reason, result.output)
933 lines = [utils.SafeEncode(val)
934 for val in utils.TailFile(logfile, lines=20)]
935 _Fail("OS rename script failed (%s), last lines in the"
936 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
939 def _GetBlockDevSymlinkPath(instance_name, idx):
940 return utils.PathJoin(constants.DISK_LINKS_DIR,
941 "%s:%d" % (instance_name, idx))
944 def _SymlinkBlockDev(instance_name, device_path, idx):
945 """Set up symlinks to a instance's block device.
947 This is an auxiliary function run when an instance is start (on the primary
948 node) or when an instance is migrated (on the target node).
951 @param instance_name: the name of the target instance
952 @param device_path: path of the physical block device, on the node
953 @param idx: the disk index
954 @return: absolute path to the disk's symlink
957 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
959 os.symlink(device_path, link_name)
961 if err.errno == errno.EEXIST:
962 if (not os.path.islink(link_name) or
963 os.readlink(link_name) != device_path):
965 os.symlink(device_path, link_name)
972 def _RemoveBlockDevLinks(instance_name, disks):
973 """Remove the block device symlinks belonging to the given instance.
976 for idx, _ in enumerate(disks):
977 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978 if os.path.islink(link_name):
982 logging.exception("Can't remove symlink '%s'", link_name)
985 def _GatherAndLinkBlockDevs(instance):
986 """Set up an instance's block device(s).
988 This is run on the primary node at instance startup. The block
989 devices must be already assembled.
991 @type instance: L{objects.Instance}
992 @param instance: the instance whose disks we shoul assemble
994 @return: list of (disk_object, device_path)
998 for idx, disk in enumerate(instance.disks):
999 device = _RecursiveFindBD(disk)
1001 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1005 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1007 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1010 block_devices.append((disk, link_name))
1012 return block_devices
1015 def StartInstance(instance):
1016 """Start an instance.
1018 @type instance: L{objects.Instance}
1019 @param instance: the instance object
1023 running_instances = GetInstanceList([instance.hypervisor])
1025 if instance.name in running_instances:
1026 logging.info("Instance %s already running, not starting", instance.name)
1030 block_devices = _GatherAndLinkBlockDevs(instance)
1031 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1032 hyper.StartInstance(instance, block_devices)
1033 except errors.BlockDeviceError, err:
1034 _Fail("Block device error: %s", err, exc=True)
1035 except errors.HypervisorError, err:
1036 _RemoveBlockDevLinks(instance.name, instance.disks)
1037 _Fail("Hypervisor error: %s", err, exc=True)
1040 def InstanceShutdown(instance, timeout):
1041 """Shut an instance down.
1043 @note: this functions uses polling with a hardcoded timeout.
1045 @type instance: L{objects.Instance}
1046 @param instance: the instance object
1047 @type timeout: integer
1048 @param timeout: maximum timeout for soft shutdown
1052 hv_name = instance.hypervisor
1053 hyper = hypervisor.GetHypervisor(hv_name)
1054 iname = instance.name
1056 if instance.name not in hyper.ListInstances():
1057 logging.info("Instance %s not running, doing nothing", iname)
1062 self.tried_once = False
1065 if iname not in hyper.ListInstances():
1069 hyper.StopInstance(instance, retry=self.tried_once)
1070 except errors.HypervisorError, err:
1071 if iname not in hyper.ListInstances():
1072 # if the instance is no longer existing, consider this a
1073 # success and go to cleanup
1076 _Fail("Failed to stop instance %s: %s", iname, err)
1078 self.tried_once = True
1080 raise utils.RetryAgain()
1083 utils.Retry(_TryShutdown(), 5, timeout)
1084 except utils.RetryTimeout:
1085 # the shutdown did not succeed
1086 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1089 hyper.StopInstance(instance, force=True)
1090 except errors.HypervisorError, err:
1091 if iname in hyper.ListInstances():
1092 # only raise an error if the instance still exists, otherwise
1093 # the error could simply be "instance ... unknown"!
1094 _Fail("Failed to force stop instance %s: %s", iname, err)
1098 if iname in hyper.ListInstances():
1099 _Fail("Could not shutdown instance %s even by destroy", iname)
1102 hyper.CleanupInstance(instance.name)
1103 except errors.HypervisorError, err:
1104 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1106 _RemoveBlockDevLinks(iname, instance.disks)
1109 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1110 """Reboot an instance.
1112 @type instance: L{objects.Instance}
1113 @param instance: the instance object to reboot
1114 @type reboot_type: str
1115 @param reboot_type: the type of reboot, one the following
1117 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1118 instance OS, do not recreate the VM
1119 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1120 restart the VM (at the hypervisor level)
1121 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1122 not accepted here, since that mode is handled differently, in
1123 cmdlib, and translates into full stop and start of the
1124 instance (instead of a call_instance_reboot RPC)
1125 @type shutdown_timeout: integer
1126 @param shutdown_timeout: maximum timeout for soft shutdown
1130 running_instances = GetInstanceList([instance.hypervisor])
1132 if instance.name not in running_instances:
1133 _Fail("Cannot reboot instance %s that is not running", instance.name)
1135 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1136 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1138 hyper.RebootInstance(instance)
1139 except errors.HypervisorError, err:
1140 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1141 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1143 InstanceShutdown(instance, shutdown_timeout)
1144 return StartInstance(instance)
1145 except errors.HypervisorError, err:
1146 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1148 _Fail("Invalid reboot_type received: %s", reboot_type)
1151 def MigrationInfo(instance):
1152 """Gather information about an instance to be migrated.
1154 @type instance: L{objects.Instance}
1155 @param instance: the instance definition
1158 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160 info = hyper.MigrationInfo(instance)
1161 except errors.HypervisorError, err:
1162 _Fail("Failed to fetch migration information: %s", err, exc=True)
1166 def AcceptInstance(instance, info, target):
1167 """Prepare the node to accept an instance.
1169 @type instance: L{objects.Instance}
1170 @param instance: the instance definition
1171 @type info: string/data (opaque)
1172 @param info: migration information, from the source node
1173 @type target: string
1174 @param target: target host (usually ip), on this node
1177 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179 hyper.AcceptInstance(instance, info, target)
1180 except errors.HypervisorError, err:
1181 _Fail("Failed to accept instance: %s", err, exc=True)
1184 def FinalizeMigration(instance, info, success):
1185 """Finalize any preparation to accept an instance.
1187 @type instance: L{objects.Instance}
1188 @param instance: the instance definition
1189 @type info: string/data (opaque)
1190 @param info: migration information, from the source node
1191 @type success: boolean
1192 @param success: whether the migration was a success or a failure
1195 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197 hyper.FinalizeMigration(instance, info, success)
1198 except errors.HypervisorError, err:
1199 _Fail("Failed to finalize migration: %s", err, exc=True)
1202 def MigrateInstance(instance, target, live):
1203 """Migrates an instance to another node.
1205 @type instance: L{objects.Instance}
1206 @param instance: the instance definition
1207 @type target: string
1208 @param target: the target node name
1210 @param live: whether the migration should be done live or not (the
1211 interpretation of this parameter is left to the hypervisor)
1213 @return: a tuple of (success, msg) where:
1214 - succes is a boolean denoting the success/failure of the operation
1215 - msg is a string with details in case of failure
1218 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1221 hyper.MigrateInstance(instance, target, live)
1222 except errors.HypervisorError, err:
1223 _Fail("Failed to migrate instance: %s", err, exc=True)
1226 def BlockdevCreate(disk, size, owner, on_primary, info):
1227 """Creates a block device for an instance.
1229 @type disk: L{objects.Disk}
1230 @param disk: the object describing the disk we should create
1232 @param size: the size of the physical underlying device, in MiB
1234 @param owner: the name of the instance for which disk is created,
1235 used for device cache data
1236 @type on_primary: boolean
1237 @param on_primary: indicates if it is the primary node or not
1239 @param info: string that will be sent to the physical device
1240 creation, used for example to set (LVM) tags on LVs
1242 @return: the new unique_id of the device (this can sometime be
1243 computed only after creation), or None. On secondary nodes,
1244 it's not required to return anything.
1247 # TODO: remove the obsolete 'size' argument
1248 # pylint: disable-msg=W0613
1251 for child in disk.children:
1253 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1254 except errors.BlockDeviceError, err:
1255 _Fail("Can't assemble device %s: %s", child, err)
1256 if on_primary or disk.AssembleOnSecondary():
1257 # we need the children open in case the device itself has to
1260 # pylint: disable-msg=E1103
1262 except errors.BlockDeviceError, err:
1263 _Fail("Can't make child '%s' read-write: %s", child, err)
1267 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1268 except errors.BlockDeviceError, err:
1269 _Fail("Can't create block device: %s", err)
1271 if on_primary or disk.AssembleOnSecondary():
1274 except errors.BlockDeviceError, err:
1275 _Fail("Can't assemble device after creation, unusual event: %s", err)
1276 device.SetSyncSpeed(constants.SYNC_SPEED)
1277 if on_primary or disk.OpenOnSecondary():
1279 device.Open(force=True)
1280 except errors.BlockDeviceError, err:
1281 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1282 DevCacheManager.UpdateCache(device.dev_path, owner,
1283 on_primary, disk.iv_name)
1285 device.SetInfo(info)
1287 return device.unique_id
1290 def _WipeDevice(path, offset, size):
1291 """This function actually wipes the device.
1293 @param path: The path to the device to wipe
1294 @param offset: The offset in MiB in the file
1295 @param size: The size in MiB to write
1298 cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1299 "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1301 result = utils.RunCmd(cmd)
1304 _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1305 result.fail_reason, result.output)
1308 def BlockdevWipe(disk, offset, size):
1309 """Wipes a block device.
1311 @type disk: L{objects.Disk}
1312 @param disk: the disk object we want to wipe
1314 @param offset: The offset in MiB in the file
1316 @param size: The size in MiB to write
1320 rdev = _RecursiveFindBD(disk)
1321 except errors.BlockDeviceError:
1325 _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1327 # Do cross verify some of the parameters
1328 if offset > rdev.size:
1329 _Fail("Offset is bigger than device size")
1330 if (offset + size) > rdev.size:
1331 _Fail("The provided offset and size to wipe is bigger than device size")
1333 _WipeDevice(rdev.dev_path, offset, size)
1336 def BlockdevRemove(disk):
1337 """Remove a block device.
1339 @note: This is intended to be called recursively.
1341 @type disk: L{objects.Disk}
1342 @param disk: the disk object we should remove
1344 @return: the success of the operation
1349 rdev = _RecursiveFindBD(disk)
1350 except errors.BlockDeviceError, err:
1351 # probably can't attach
1352 logging.info("Can't attach to device %s in remove", disk)
1354 if rdev is not None:
1355 r_path = rdev.dev_path
1358 except errors.BlockDeviceError, err:
1359 msgs.append(str(err))
1361 DevCacheManager.RemoveCache(r_path)
1364 for child in disk.children:
1366 BlockdevRemove(child)
1367 except RPCFail, err:
1368 msgs.append(str(err))
1371 _Fail("; ".join(msgs))
1374 def _RecursiveAssembleBD(disk, owner, as_primary):
1375 """Activate a block device for an instance.
1377 This is run on the primary and secondary nodes for an instance.
1379 @note: this function is called recursively.
1381 @type disk: L{objects.Disk}
1382 @param disk: the disk we try to assemble
1384 @param owner: the name of the instance which owns the disk
1385 @type as_primary: boolean
1386 @param as_primary: if we should make the block device
1389 @return: the assembled device or None (in case no device
1391 @raise errors.BlockDeviceError: in case there is an error
1392 during the activation of the children or the device
1398 mcn = disk.ChildrenNeeded()
1400 mcn = 0 # max number of Nones allowed
1402 mcn = len(disk.children) - mcn # max number of Nones
1403 for chld_disk in disk.children:
1405 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1406 except errors.BlockDeviceError, err:
1407 if children.count(None) >= mcn:
1410 logging.error("Error in child activation (but continuing): %s",
1412 children.append(cdev)
1414 if as_primary or disk.AssembleOnSecondary():
1415 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1416 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1418 if as_primary or disk.OpenOnSecondary():
1420 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1421 as_primary, disk.iv_name)
1428 def BlockdevAssemble(disk, owner, as_primary):
1429 """Activate a block device for an instance.
1431 This is a wrapper over _RecursiveAssembleBD.
1433 @rtype: str or boolean
1434 @return: a C{/dev/...} path for primary nodes, and
1435 C{True} for secondary nodes
1439 result = _RecursiveAssembleBD(disk, owner, as_primary)
1440 if isinstance(result, bdev.BlockDev):
1441 # pylint: disable-msg=E1103
1442 result = result.dev_path
1443 except errors.BlockDeviceError, err:
1444 _Fail("Error while assembling disk: %s", err, exc=True)
1449 def BlockdevShutdown(disk):
1450 """Shut down a block device.
1452 First, if the device is assembled (Attach() is successful), then
1453 the device is shutdown. Then the children of the device are
1456 This function is called recursively. Note that we don't cache the
1457 children or such, as oppossed to assemble, shutdown of different
1458 devices doesn't require that the upper device was active.
1460 @type disk: L{objects.Disk}
1461 @param disk: the description of the disk we should
1467 r_dev = _RecursiveFindBD(disk)
1468 if r_dev is not None:
1469 r_path = r_dev.dev_path
1472 DevCacheManager.RemoveCache(r_path)
1473 except errors.BlockDeviceError, err:
1474 msgs.append(str(err))
1477 for child in disk.children:
1479 BlockdevShutdown(child)
1480 except RPCFail, err:
1481 msgs.append(str(err))
1484 _Fail("; ".join(msgs))
1487 def BlockdevAddchildren(parent_cdev, new_cdevs):
1488 """Extend a mirrored block device.
1490 @type parent_cdev: L{objects.Disk}
1491 @param parent_cdev: the disk to which we should add children
1492 @type new_cdevs: list of L{objects.Disk}
1493 @param new_cdevs: the list of children which we should add
1497 parent_bdev = _RecursiveFindBD(parent_cdev)
1498 if parent_bdev is None:
1499 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1500 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1501 if new_bdevs.count(None) > 0:
1502 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1503 parent_bdev.AddChildren(new_bdevs)
1506 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1507 """Shrink a mirrored block device.
1509 @type parent_cdev: L{objects.Disk}
1510 @param parent_cdev: the disk from which we should remove children
1511 @type new_cdevs: list of L{objects.Disk}
1512 @param new_cdevs: the list of children which we should remove
1516 parent_bdev = _RecursiveFindBD(parent_cdev)
1517 if parent_bdev is None:
1518 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1520 for disk in new_cdevs:
1521 rpath = disk.StaticDevPath()
1523 bd = _RecursiveFindBD(disk)
1525 _Fail("Can't find device %s while removing children", disk)
1527 devs.append(bd.dev_path)
1529 if not utils.IsNormAbsPath(rpath):
1530 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1532 parent_bdev.RemoveChildren(devs)
1535 def BlockdevGetmirrorstatus(disks):
1536 """Get the mirroring status of a list of devices.
1538 @type disks: list of L{objects.Disk}
1539 @param disks: the list of disks which we should query
1542 a list of (mirror_done, estimated_time) tuples, which
1543 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1544 @raise errors.BlockDeviceError: if any of the disks cannot be
1550 rbd = _RecursiveFindBD(dsk)
1552 _Fail("Can't find device %s", dsk)
1554 stats.append(rbd.CombinedSyncStatus())
1559 def _RecursiveFindBD(disk):
1560 """Check if a device is activated.
1562 If so, return information about the real device.
1564 @type disk: L{objects.Disk}
1565 @param disk: the disk object we need to find
1567 @return: None if the device can't be found,
1568 otherwise the device instance
1573 for chdisk in disk.children:
1574 children.append(_RecursiveFindBD(chdisk))
1576 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1579 def _OpenRealBD(disk):
1580 """Opens the underlying block device of a disk.
1582 @type disk: L{objects.Disk}
1583 @param disk: the disk object we want to open
1586 real_disk = _RecursiveFindBD(disk)
1587 if real_disk is None:
1588 _Fail("Block device '%s' is not set up", disk)
1595 def BlockdevFind(disk):
1596 """Check if a device is activated.
1598 If it is, return information about the real device.
1600 @type disk: L{objects.Disk}
1601 @param disk: the disk to find
1602 @rtype: None or objects.BlockDevStatus
1603 @return: None if the disk cannot be found, otherwise a the current
1608 rbd = _RecursiveFindBD(disk)
1609 except errors.BlockDeviceError, err:
1610 _Fail("Failed to find device: %s", err, exc=True)
1615 return rbd.GetSyncStatus()
1618 def BlockdevGetsize(disks):
1619 """Computes the size of the given disks.
1621 If a disk is not found, returns None instead.
1623 @type disks: list of L{objects.Disk}
1624 @param disks: the list of disk to compute the size for
1626 @return: list with elements None if the disk cannot be found,
1633 rbd = _RecursiveFindBD(cf)
1634 except errors.BlockDeviceError:
1640 result.append(rbd.GetActualSize())
1644 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1645 """Export a block device to a remote node.
1647 @type disk: L{objects.Disk}
1648 @param disk: the description of the disk to export
1649 @type dest_node: str
1650 @param dest_node: the destination node to export to
1651 @type dest_path: str
1652 @param dest_path: the destination path on the target node
1653 @type cluster_name: str
1654 @param cluster_name: the cluster name, needed for SSH hostalias
1658 real_disk = _OpenRealBD(disk)
1660 # the block size on the read dd is 1MiB to match our units
1661 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1662 "dd if=%s bs=1048576 count=%s",
1663 real_disk.dev_path, str(disk.size))
1665 # we set here a smaller block size as, due to ssh buffering, more
1666 # than 64-128k will mostly ignored; we use nocreat to fail if the
1667 # device is not already there or we pass a wrong path; we use
1668 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1669 # to not buffer too much memory; this means that at best, we flush
1670 # every 64k, which will not be very fast
1671 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1672 " oflag=dsync", dest_path)
1674 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1675 constants.GANETI_RUNAS,
1678 # all commands have been checked, so we're safe to combine them
1679 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1681 result = utils.RunCmd(["bash", "-c", command])
1684 _Fail("Disk copy command '%s' returned error: %s"
1685 " output: %s", command, result.fail_reason, result.output)
1688 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1689 """Write a file to the filesystem.
1691 This allows the master to overwrite(!) a file. It will only perform
1692 the operation if the file belongs to a list of configuration files.
1694 @type file_name: str
1695 @param file_name: the target file name
1697 @param data: the new contents of the file
1699 @param mode: the mode to give the file (can be None)
1701 @param uid: the owner of the file (can be -1 for default)
1703 @param gid: the group of the file (can be -1 for default)
1705 @param atime: the atime to set on the file (can be None)
1707 @param mtime: the mtime to set on the file (can be None)
1711 if not os.path.isabs(file_name):
1712 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1714 if file_name not in _ALLOWED_UPLOAD_FILES:
1715 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1718 raw_data = _Decompress(data)
1720 utils.SafeWriteFile(file_name, None,
1721 data=raw_data, mode=mode, uid=uid, gid=gid,
1722 atime=atime, mtime=mtime)
1725 def WriteSsconfFiles(values):
1726 """Update all ssconf files.
1728 Wrapper around the SimpleStore.WriteFiles.
1731 ssconf.SimpleStore().WriteFiles(values)
1734 def _ErrnoOrStr(err):
1735 """Format an EnvironmentError exception.
1737 If the L{err} argument has an errno attribute, it will be looked up
1738 and converted into a textual C{E...} description. Otherwise the
1739 string representation of the error will be returned.
1741 @type err: L{EnvironmentError}
1742 @param err: the exception to format
1745 if hasattr(err, 'errno'):
1746 detail = errno.errorcode[err.errno]
1752 def _OSOndiskAPIVersion(os_dir):
1753 """Compute and return the API version of a given OS.
1755 This function will try to read the API version of the OS residing in
1756 the 'os_dir' directory.
1759 @param os_dir: the directory in which we should look for the OS
1761 @return: tuple (status, data) with status denoting the validity and
1762 data holding either the vaid versions or an error message
1765 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1768 st = os.stat(api_file)
1769 except EnvironmentError, err:
1770 return False, ("Required file '%s' not found under path %s: %s" %
1771 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1773 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1774 return False, ("File '%s' in %s is not a regular file" %
1775 (constants.OS_API_FILE, os_dir))
1778 api_versions = utils.ReadFile(api_file).splitlines()
1779 except EnvironmentError, err:
1780 return False, ("Error while reading the API version file at %s: %s" %
1781 (api_file, _ErrnoOrStr(err)))
1784 api_versions = [int(version.strip()) for version in api_versions]
1785 except (TypeError, ValueError), err:
1786 return False, ("API version(s) can't be converted to integer: %s" %
1789 return True, api_versions
1792 def DiagnoseOS(top_dirs=None):
1793 """Compute the validity for all OSes.
1795 @type top_dirs: list
1796 @param top_dirs: the list of directories in which to
1797 search (if not given defaults to
1798 L{constants.OS_SEARCH_PATH})
1799 @rtype: list of L{objects.OS}
1800 @return: a list of tuples (name, path, status, diagnose, variants,
1801 parameters, api_version) for all (potential) OSes under all
1802 search paths, where:
1803 - name is the (potential) OS name
1804 - path is the full path to the OS
1805 - status True/False is the validity of the OS
1806 - diagnose is the error message for an invalid OS, otherwise empty
1807 - variants is a list of supported OS variants, if any
1808 - parameters is a list of (name, help) parameters, if any
1809 - api_version is a list of support OS API versions
1812 if top_dirs is None:
1813 top_dirs = constants.OS_SEARCH_PATH
1816 for dir_name in top_dirs:
1817 if os.path.isdir(dir_name):
1819 f_names = utils.ListVisibleFiles(dir_name)
1820 except EnvironmentError, err:
1821 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1823 for name in f_names:
1824 os_path = utils.PathJoin(dir_name, name)
1825 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1828 variants = os_inst.supported_variants
1829 parameters = os_inst.supported_parameters
1830 api_versions = os_inst.api_versions
1833 variants = parameters = api_versions = []
1834 result.append((name, os_path, status, diagnose, variants,
1835 parameters, api_versions))
1840 def _TryOSFromDisk(name, base_dir=None):
1841 """Create an OS instance from disk.
1843 This function will return an OS instance if the given name is a
1846 @type base_dir: string
1847 @keyword base_dir: Base directory containing OS installations.
1848 Defaults to a search in all the OS_SEARCH_PATH dirs.
1850 @return: success and either the OS instance if we find a valid one,
1854 if base_dir is None:
1855 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1857 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1860 return False, "Directory for OS %s not found in search path" % name
1862 status, api_versions = _OSOndiskAPIVersion(os_dir)
1865 return status, api_versions
1867 if not constants.OS_API_VERSIONS.intersection(api_versions):
1868 return False, ("API version mismatch for path '%s': found %s, want %s." %
1869 (os_dir, api_versions, constants.OS_API_VERSIONS))
1871 # OS Files dictionary, we will populate it with the absolute path names
1872 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1874 if max(api_versions) >= constants.OS_API_V15:
1875 os_files[constants.OS_VARIANTS_FILE] = ''
1877 if max(api_versions) >= constants.OS_API_V20:
1878 os_files[constants.OS_PARAMETERS_FILE] = ''
1880 del os_files[constants.OS_SCRIPT_VERIFY]
1882 for filename in os_files:
1883 os_files[filename] = utils.PathJoin(os_dir, filename)
1886 st = os.stat(os_files[filename])
1887 except EnvironmentError, err:
1888 return False, ("File '%s' under path '%s' is missing (%s)" %
1889 (filename, os_dir, _ErrnoOrStr(err)))
1891 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1892 return False, ("File '%s' under path '%s' is not a regular file" %
1895 if filename in constants.OS_SCRIPTS:
1896 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1897 return False, ("File '%s' under path '%s' is not executable" %
1901 if constants.OS_VARIANTS_FILE in os_files:
1902 variants_file = os_files[constants.OS_VARIANTS_FILE]
1904 variants = utils.ReadFile(variants_file).splitlines()
1905 except EnvironmentError, err:
1906 return False, ("Error while reading the OS variants file at %s: %s" %
1907 (variants_file, _ErrnoOrStr(err)))
1909 return False, ("No supported os variant found")
1912 if constants.OS_PARAMETERS_FILE in os_files:
1913 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1915 parameters = utils.ReadFile(parameters_file).splitlines()
1916 except EnvironmentError, err:
1917 return False, ("Error while reading the OS parameters file at %s: %s" %
1918 (parameters_file, _ErrnoOrStr(err)))
1919 parameters = [v.split(None, 1) for v in parameters]
1921 os_obj = objects.OS(name=name, path=os_dir,
1922 create_script=os_files[constants.OS_SCRIPT_CREATE],
1923 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1924 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1925 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1926 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1928 supported_variants=variants,
1929 supported_parameters=parameters,
1930 api_versions=api_versions)
1934 def OSFromDisk(name, base_dir=None):
1935 """Create an OS instance from disk.
1937 This function will return an OS instance if the given name is a
1938 valid OS name. Otherwise, it will raise an appropriate
1939 L{RPCFail} exception, detailing why this is not a valid OS.
1941 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1942 an exception but returns true/false status data.
1944 @type base_dir: string
1945 @keyword base_dir: Base directory containing OS installations.
1946 Defaults to a search in all the OS_SEARCH_PATH dirs.
1947 @rtype: L{objects.OS}
1948 @return: the OS instance if we find a valid one
1949 @raise RPCFail: if we don't find a valid OS
1952 name_only = objects.OS.GetName(name)
1953 status, payload = _TryOSFromDisk(name_only, base_dir)
1961 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1962 """Calculate the basic environment for an os script.
1965 @param os_name: full operating system name (including variant)
1966 @type inst_os: L{objects.OS}
1967 @param inst_os: operating system for which the environment is being built
1968 @type os_params: dict
1969 @param os_params: the OS parameters
1970 @type debug: integer
1971 @param debug: debug level (0 or 1, for OS Api 10)
1973 @return: dict of environment variables
1974 @raise errors.BlockDeviceError: if the block device
1980 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1981 result['OS_API_VERSION'] = '%d' % api_version
1982 result['OS_NAME'] = inst_os.name
1983 result['DEBUG_LEVEL'] = '%d' % debug
1986 if api_version >= constants.OS_API_V15:
1987 variant = objects.OS.GetVariant(os_name)
1989 variant = inst_os.supported_variants[0]
1990 result['OS_VARIANT'] = variant
1993 for pname, pvalue in os_params.items():
1994 result['OSP_%s' % pname.upper()] = pvalue
1999 def OSEnvironment(instance, inst_os, debug=0):
2000 """Calculate the environment for an os script.
2002 @type instance: L{objects.Instance}
2003 @param instance: target instance for the os script run
2004 @type inst_os: L{objects.OS}
2005 @param inst_os: operating system for which the environment is being built
2006 @type debug: integer
2007 @param debug: debug level (0 or 1, for OS Api 10)
2009 @return: dict of environment variables
2010 @raise errors.BlockDeviceError: if the block device
2014 result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2016 for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2017 result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2019 result['HYPERVISOR'] = instance.hypervisor
2020 result['DISK_COUNT'] = '%d' % len(instance.disks)
2021 result['NIC_COUNT'] = '%d' % len(instance.nics)
2024 for idx, disk in enumerate(instance.disks):
2025 real_disk = _OpenRealBD(disk)
2026 result['DISK_%d_PATH' % idx] = real_disk.dev_path
2027 result['DISK_%d_ACCESS' % idx] = disk.mode
2028 if constants.HV_DISK_TYPE in instance.hvparams:
2029 result['DISK_%d_FRONTEND_TYPE' % idx] = \
2030 instance.hvparams[constants.HV_DISK_TYPE]
2031 if disk.dev_type in constants.LDS_BLOCK:
2032 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2033 elif disk.dev_type == constants.LD_FILE:
2034 result['DISK_%d_BACKEND_TYPE' % idx] = \
2035 'file:%s' % disk.physical_id[0]
2038 for idx, nic in enumerate(instance.nics):
2039 result['NIC_%d_MAC' % idx] = nic.mac
2041 result['NIC_%d_IP' % idx] = nic.ip
2042 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2043 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2044 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2045 if nic.nicparams[constants.NIC_LINK]:
2046 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2047 if constants.HV_NIC_TYPE in instance.hvparams:
2048 result['NIC_%d_FRONTEND_TYPE' % idx] = \
2049 instance.hvparams[constants.HV_NIC_TYPE]
2052 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2053 for key, value in source.items():
2054 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2059 def BlockdevGrow(disk, amount):
2060 """Grow a stack of block devices.
2062 This function is called recursively, with the childrens being the
2063 first ones to resize.
2065 @type disk: L{objects.Disk}
2066 @param disk: the disk to be grown
2067 @rtype: (status, result)
2068 @return: a tuple with the status of the operation
2069 (True/False), and the errors message if status
2073 r_dev = _RecursiveFindBD(disk)
2075 _Fail("Cannot find block device %s", disk)
2079 except errors.BlockDeviceError, err:
2080 _Fail("Failed to grow block device: %s", err, exc=True)
2083 def BlockdevSnapshot(disk):
2084 """Create a snapshot copy of a block device.
2086 This function is called recursively, and the snapshot is actually created
2087 just for the leaf lvm backend device.
2089 @type disk: L{objects.Disk}
2090 @param disk: the disk to be snapshotted
2092 @return: snapshot disk path
2095 if disk.dev_type == constants.LD_DRBD8:
2096 if not disk.children:
2097 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2099 return BlockdevSnapshot(disk.children[0])
2100 elif disk.dev_type == constants.LD_LV:
2101 r_dev = _RecursiveFindBD(disk)
2102 if r_dev is not None:
2103 # FIXME: choose a saner value for the snapshot size
2104 # let's stay on the safe side and ask for the full size, for now
2105 return r_dev.Snapshot(disk.size)
2107 _Fail("Cannot find block device %s", disk)
2109 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2110 disk.unique_id, disk.dev_type)
2113 def FinalizeExport(instance, snap_disks):
2114 """Write out the export configuration information.
2116 @type instance: L{objects.Instance}
2117 @param instance: the instance which we export, used for
2118 saving configuration
2119 @type snap_disks: list of L{objects.Disk}
2120 @param snap_disks: list of snapshot block devices, which
2121 will be used to get the actual name of the dump file
2126 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2127 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2129 config = objects.SerializableConfigParser()
2131 config.add_section(constants.INISECT_EXP)
2132 config.set(constants.INISECT_EXP, 'version', '0')
2133 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2134 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2135 config.set(constants.INISECT_EXP, 'os', instance.os)
2136 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2138 config.add_section(constants.INISECT_INS)
2139 config.set(constants.INISECT_INS, 'name', instance.name)
2140 config.set(constants.INISECT_INS, 'memory', '%d' %
2141 instance.beparams[constants.BE_MEMORY])
2142 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2143 instance.beparams[constants.BE_VCPUS])
2144 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2145 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2148 for nic_count, nic in enumerate(instance.nics):
2150 config.set(constants.INISECT_INS, 'nic%d_mac' %
2151 nic_count, '%s' % nic.mac)
2152 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2153 for param in constants.NICS_PARAMETER_TYPES:
2154 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2155 '%s' % nic.nicparams.get(param, None))
2156 # TODO: redundant: on load can read nics until it doesn't exist
2157 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2160 for disk_count, disk in enumerate(snap_disks):
2163 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2164 ('%s' % disk.iv_name))
2165 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2166 ('%s' % disk.physical_id[1]))
2167 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2170 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2172 # New-style hypervisor/backend parameters
2174 config.add_section(constants.INISECT_HYP)
2175 for name, value in instance.hvparams.items():
2176 if name not in constants.HVC_GLOBALS:
2177 config.set(constants.INISECT_HYP, name, str(value))
2179 config.add_section(constants.INISECT_BEP)
2180 for name, value in instance.beparams.items():
2181 config.set(constants.INISECT_BEP, name, str(value))
2183 config.add_section(constants.INISECT_OSP)
2184 for name, value in instance.osparams.items():
2185 config.set(constants.INISECT_OSP, name, str(value))
2187 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2188 data=config.Dumps())
2189 shutil.rmtree(finaldestdir, ignore_errors=True)
2190 shutil.move(destdir, finaldestdir)
2193 def ExportInfo(dest):
2194 """Get export configuration information.
2197 @param dest: directory containing the export
2199 @rtype: L{objects.SerializableConfigParser}
2200 @return: a serializable config file containing the
2204 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2206 config = objects.SerializableConfigParser()
2209 if (not config.has_section(constants.INISECT_EXP) or
2210 not config.has_section(constants.INISECT_INS)):
2211 _Fail("Export info file doesn't have the required fields")
2213 return config.Dumps()
2217 """Return a list of exports currently available on this machine.
2220 @return: list of the exports
2223 if os.path.isdir(constants.EXPORT_DIR):
2224 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2226 _Fail("No exports directory")
2229 def RemoveExport(export):
2230 """Remove an existing export from the node.
2233 @param export: the name of the export to remove
2237 target = utils.PathJoin(constants.EXPORT_DIR, export)
2240 shutil.rmtree(target)
2241 except EnvironmentError, err:
2242 _Fail("Error while removing the export: %s", err, exc=True)
2245 def BlockdevRename(devlist):
2246 """Rename a list of block devices.
2248 @type devlist: list of tuples
2249 @param devlist: list of tuples of the form (disk,
2250 new_logical_id, new_physical_id); disk is an
2251 L{objects.Disk} object describing the current disk,
2252 and new logical_id/physical_id is the name we
2255 @return: True if all renames succeeded, False otherwise
2260 for disk, unique_id in devlist:
2261 dev = _RecursiveFindBD(disk)
2263 msgs.append("Can't find device %s in rename" % str(disk))
2267 old_rpath = dev.dev_path
2268 dev.Rename(unique_id)
2269 new_rpath = dev.dev_path
2270 if old_rpath != new_rpath:
2271 DevCacheManager.RemoveCache(old_rpath)
2272 # FIXME: we should add the new cache information here, like:
2273 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2274 # but we don't have the owner here - maybe parse from existing
2275 # cache? for now, we only lose lvm data when we rename, which
2276 # is less critical than DRBD or MD
2277 except errors.BlockDeviceError, err:
2278 msgs.append("Can't rename device '%s' to '%s': %s" %
2279 (dev, unique_id, err))
2280 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2283 _Fail("; ".join(msgs))
2286 def _TransformFileStorageDir(file_storage_dir):
2287 """Checks whether given file_storage_dir is valid.
2289 Checks wheter the given file_storage_dir is within the cluster-wide
2290 default file_storage_dir stored in SimpleStore. Only paths under that
2291 directory are allowed.
2293 @type file_storage_dir: str
2294 @param file_storage_dir: the path to check
2296 @return: the normalized path if valid, None otherwise
2299 if not constants.ENABLE_FILE_STORAGE:
2300 _Fail("File storage disabled at configure time")
2302 file_storage_dir = os.path.normpath(file_storage_dir)
2303 base_file_storage_dir = cfg.GetFileStorageDir()
2304 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2305 base_file_storage_dir):
2306 _Fail("File storage directory '%s' is not under base file"
2307 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2308 return file_storage_dir
2311 def CreateFileStorageDir(file_storage_dir):
2312 """Create file storage directory.
2314 @type file_storage_dir: str
2315 @param file_storage_dir: directory to create
2318 @return: tuple with first element a boolean indicating wheter dir
2319 creation was successful or not
2322 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2323 if os.path.exists(file_storage_dir):
2324 if not os.path.isdir(file_storage_dir):
2325 _Fail("Specified storage dir '%s' is not a directory",
2329 os.makedirs(file_storage_dir, 0750)
2330 except OSError, err:
2331 _Fail("Cannot create file storage directory '%s': %s",
2332 file_storage_dir, err, exc=True)
2335 def RemoveFileStorageDir(file_storage_dir):
2336 """Remove file storage directory.
2338 Remove it only if it's empty. If not log an error and return.
2340 @type file_storage_dir: str
2341 @param file_storage_dir: the directory we should cleanup
2342 @rtype: tuple (success,)
2343 @return: tuple of one element, C{success}, denoting
2344 whether the operation was successful
2347 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2348 if os.path.exists(file_storage_dir):
2349 if not os.path.isdir(file_storage_dir):
2350 _Fail("Specified Storage directory '%s' is not a directory",
2352 # deletes dir only if empty, otherwise we want to fail the rpc call
2354 os.rmdir(file_storage_dir)
2355 except OSError, err:
2356 _Fail("Cannot remove file storage directory '%s': %s",
2357 file_storage_dir, err)
2360 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2361 """Rename the file storage directory.
2363 @type old_file_storage_dir: str
2364 @param old_file_storage_dir: the current path
2365 @type new_file_storage_dir: str
2366 @param new_file_storage_dir: the name we should rename to
2367 @rtype: tuple (success,)
2368 @return: tuple of one element, C{success}, denoting
2369 whether the operation was successful
2372 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2373 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2374 if not os.path.exists(new_file_storage_dir):
2375 if os.path.isdir(old_file_storage_dir):
2377 os.rename(old_file_storage_dir, new_file_storage_dir)
2378 except OSError, err:
2379 _Fail("Cannot rename '%s' to '%s': %s",
2380 old_file_storage_dir, new_file_storage_dir, err)
2382 _Fail("Specified storage dir '%s' is not a directory",
2383 old_file_storage_dir)
2385 if os.path.exists(old_file_storage_dir):
2386 _Fail("Cannot rename '%s' to '%s': both locations exist",
2387 old_file_storage_dir, new_file_storage_dir)
2390 def _EnsureJobQueueFile(file_name):
2391 """Checks whether the given filename is in the queue directory.
2393 @type file_name: str
2394 @param file_name: the file name we should check
2396 @raises RPCFail: if the file is not valid
2399 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2400 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2403 _Fail("Passed job queue file '%s' does not belong to"
2404 " the queue directory '%s'", file_name, queue_dir)
2407 def JobQueueUpdate(file_name, content):
2408 """Updates a file in the queue directory.
2410 This is just a wrapper over L{utils.WriteFile}, with proper
2413 @type file_name: str
2414 @param file_name: the job file name
2416 @param content: the new job contents
2418 @return: the success of the operation
2421 _EnsureJobQueueFile(file_name)
2422 getents = runtime.GetEnts()
2424 # Write and replace the file atomically
2425 utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2426 gid=getents.masterd_gid)
2429 def JobQueueRename(old, new):
2430 """Renames a job queue file.
2432 This is just a wrapper over os.rename with proper checking.
2435 @param old: the old (actual) file name
2437 @param new: the desired file name
2439 @return: the success of the operation and payload
2442 _EnsureJobQueueFile(old)
2443 _EnsureJobQueueFile(new)
2445 utils.RenameFile(old, new, mkdir=True)
2448 def BlockdevClose(instance_name, disks):
2449 """Closes the given block devices.
2451 This means they will be switched to secondary mode (in case of
2454 @param instance_name: if the argument is not empty, the symlinks
2455 of this instance will be removed
2456 @type disks: list of L{objects.Disk}
2457 @param disks: the list of disks to be closed
2458 @rtype: tuple (success, message)
2459 @return: a tuple of success and message, where success
2460 indicates the succes of the operation, and message
2461 which will contain the error details in case we
2467 rd = _RecursiveFindBD(cf)
2469 _Fail("Can't find device %s", cf)
2476 except errors.BlockDeviceError, err:
2477 msg.append(str(err))
2479 _Fail("Can't make devices secondary: %s", ",".join(msg))
2482 _RemoveBlockDevLinks(instance_name, disks)
2485 def ValidateHVParams(hvname, hvparams):
2486 """Validates the given hypervisor parameters.
2488 @type hvname: string
2489 @param hvname: the hypervisor name
2490 @type hvparams: dict
2491 @param hvparams: the hypervisor parameters to be validated
2496 hv_type = hypervisor.GetHypervisor(hvname)
2497 hv_type.ValidateParameters(hvparams)
2498 except errors.HypervisorError, err:
2499 _Fail(str(err), log=False)
2502 def _CheckOSPList(os_obj, parameters):
2503 """Check whether a list of parameters is supported by the OS.
2505 @type os_obj: L{objects.OS}
2506 @param os_obj: OS object to check
2507 @type parameters: list
2508 @param parameters: the list of parameters to check
2511 supported = [v[0] for v in os_obj.supported_parameters]
2512 delta = frozenset(parameters).difference(supported)
2514 _Fail("The following parameters are not supported"
2515 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2518 def ValidateOS(required, osname, checks, osparams):
2519 """Validate the given OS' parameters.
2521 @type required: boolean
2522 @param required: whether absence of the OS should translate into
2524 @type osname: string
2525 @param osname: the OS to be validated
2527 @param checks: list of the checks to run (currently only 'parameters')
2528 @type osparams: dict
2529 @param osparams: dictionary with OS parameters
2531 @return: True if the validation passed, or False if the OS was not
2532 found and L{required} was false
2535 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2536 _Fail("Unknown checks required for OS %s: %s", osname,
2537 set(checks).difference(constants.OS_VALIDATE_CALLS))
2539 name_only = objects.OS.GetName(osname)
2540 status, tbv = _TryOSFromDisk(name_only, None)
2548 if max(tbv.api_versions) < constants.OS_API_V20:
2551 if constants.OS_VALIDATE_PARAMETERS in checks:
2552 _CheckOSPList(tbv, osparams.keys())
2554 validate_env = OSCoreEnv(osname, tbv, osparams)
2555 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2558 logging.error("os validate command '%s' returned error: %s output: %s",
2559 result.cmd, result.fail_reason, result.output)
2560 _Fail("OS validation script failed (%s), output: %s",
2561 result.fail_reason, result.output, log=False)
2567 """Demotes the current node from master candidate role.
2570 # try to ensure we're not the master by mistake
2571 master, myself = ssconf.GetMasterAndMyself()
2572 if master == myself:
2573 _Fail("ssconf status shows I'm the master node, will not demote")
2575 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2576 if not result.failed:
2577 _Fail("The master daemon is running, will not demote")
2580 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2581 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2582 except EnvironmentError, err:
2583 if err.errno != errno.ENOENT:
2584 _Fail("Error while backing up cluster file: %s", err, exc=True)
2586 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2589 def _GetX509Filenames(cryptodir, name):
2590 """Returns the full paths for the private key and certificate.
2593 return (utils.PathJoin(cryptodir, name),
2594 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2595 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2598 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2599 """Creates a new X509 certificate for SSL/TLS.
2602 @param validity: Validity in seconds
2603 @rtype: tuple; (string, string)
2604 @return: Certificate name and public part
2607 (key_pem, cert_pem) = \
2608 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2609 min(validity, _MAX_SSL_CERT_VALIDITY))
2611 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2612 prefix="x509-%s-" % utils.TimestampForFilename())
2614 name = os.path.basename(cert_dir)
2615 assert len(name) > 5
2617 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2619 utils.WriteFile(key_file, mode=0400, data=key_pem)
2620 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2622 # Never return private key as it shouldn't leave the node
2623 return (name, cert_pem)
2625 shutil.rmtree(cert_dir, ignore_errors=True)
2629 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2630 """Removes a X509 certificate.
2633 @param name: Certificate name
2636 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2638 utils.RemoveFile(key_file)
2639 utils.RemoveFile(cert_file)
2643 except EnvironmentError, err:
2644 _Fail("Cannot remove certificate directory '%s': %s",
2648 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2649 """Returns the command for the requested input/output.
2651 @type instance: L{objects.Instance}
2652 @param instance: The instance object
2653 @param mode: Import/export mode
2654 @param ieio: Input/output type
2655 @param ieargs: Input/output arguments
2658 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2665 if ieio == constants.IEIO_FILE:
2666 (filename, ) = ieargs
2668 if not utils.IsNormAbsPath(filename):
2669 _Fail("Path '%s' is not normalized or absolute", filename)
2671 directory = os.path.normpath(os.path.dirname(filename))
2673 if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2674 constants.EXPORT_DIR):
2675 _Fail("File '%s' is not under exports directory '%s'",
2676 filename, constants.EXPORT_DIR)
2679 utils.Makedirs(directory, mode=0750)
2681 quoted_filename = utils.ShellQuote(filename)
2683 if mode == constants.IEM_IMPORT:
2684 suffix = "> %s" % quoted_filename
2685 elif mode == constants.IEM_EXPORT:
2686 suffix = "< %s" % quoted_filename
2688 # Retrieve file size
2690 st = os.stat(filename)
2691 except EnvironmentError, err:
2692 logging.error("Can't stat(2) %s: %s", filename, err)
2694 exp_size = utils.BytesToMebibyte(st.st_size)
2696 elif ieio == constants.IEIO_RAW_DISK:
2699 real_disk = _OpenRealBD(disk)
2701 if mode == constants.IEM_IMPORT:
2702 # we set here a smaller block size as, due to transport buffering, more
2703 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2704 # is not already there or we pass a wrong path; we use notrunc to no
2705 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2706 # much memory; this means that at best, we flush every 64k, which will
2708 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2709 " bs=%s oflag=dsync"),
2713 elif mode == constants.IEM_EXPORT:
2714 # the block size on the read dd is 1MiB to match our units
2715 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2717 str(1024 * 1024), # 1 MB
2719 exp_size = disk.size
2721 elif ieio == constants.IEIO_SCRIPT:
2722 (disk, disk_index, ) = ieargs
2724 assert isinstance(disk_index, (int, long))
2726 real_disk = _OpenRealBD(disk)
2728 inst_os = OSFromDisk(instance.os)
2729 env = OSEnvironment(instance, inst_os)
2731 if mode == constants.IEM_IMPORT:
2732 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2733 env["IMPORT_INDEX"] = str(disk_index)
2734 script = inst_os.import_script
2736 elif mode == constants.IEM_EXPORT:
2737 env["EXPORT_DEVICE"] = real_disk.dev_path
2738 env["EXPORT_INDEX"] = str(disk_index)
2739 script = inst_os.export_script
2741 # TODO: Pass special environment only to script
2742 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2744 if mode == constants.IEM_IMPORT:
2745 suffix = "| %s" % script_cmd
2747 elif mode == constants.IEM_EXPORT:
2748 prefix = "%s |" % script_cmd
2750 # Let script predict size
2751 exp_size = constants.IE_CUSTOM_SIZE
2754 _Fail("Invalid %s I/O mode %r", mode, ieio)
2756 return (env, prefix, suffix, exp_size)
2759 def _CreateImportExportStatusDir(prefix):
2760 """Creates status directory for import/export.
2763 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2765 (prefix, utils.TimestampForFilename())))
2768 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2769 """Starts an import or export daemon.
2771 @param mode: Import/output mode
2772 @type opts: L{objects.ImportExportOptions}
2773 @param opts: Daemon options
2775 @param host: Remote host for export (None for import)
2777 @param port: Remote port for export (None for import)
2778 @type instance: L{objects.Instance}
2779 @param instance: Instance object
2780 @param ieio: Input/output type
2781 @param ieioargs: Input/output arguments
2784 if mode == constants.IEM_IMPORT:
2787 if not (host is None and port is None):
2788 _Fail("Can not specify host or port on import")
2790 elif mode == constants.IEM_EXPORT:
2793 if host is None or port is None:
2794 _Fail("Host and port must be specified for an export")
2797 _Fail("Invalid mode %r", mode)
2799 if (opts.key_name is None) ^ (opts.ca_pem is None):
2800 _Fail("Cluster certificate can only be used for both key and CA")
2802 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2803 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2805 if opts.key_name is None:
2807 key_path = constants.NODED_CERT_FILE
2808 cert_path = constants.NODED_CERT_FILE
2809 assert opts.ca_pem is None
2811 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2813 assert opts.ca_pem is not None
2815 for i in [key_path, cert_path]:
2816 if not os.path.exists(i):
2817 _Fail("File '%s' does not exist" % i)
2819 status_dir = _CreateImportExportStatusDir(prefix)
2821 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2822 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2823 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2825 if opts.ca_pem is None:
2827 ca = utils.ReadFile(constants.NODED_CERT_FILE)
2832 utils.WriteFile(ca_file, data=ca, mode=0400)
2835 constants.IMPORT_EXPORT_DAEMON,
2837 "--key=%s" % key_path,
2838 "--cert=%s" % cert_path,
2839 "--ca=%s" % ca_file,
2843 cmd.append("--host=%s" % host)
2846 cmd.append("--port=%s" % port)
2849 cmd.append("--compress=%s" % opts.compress)
2852 cmd.append("--magic=%s" % opts.magic)
2854 if exp_size is not None:
2855 cmd.append("--expected-size=%s" % exp_size)
2858 cmd.append("--cmd-prefix=%s" % cmd_prefix)
2861 cmd.append("--cmd-suffix=%s" % cmd_suffix)
2863 logfile = _InstanceLogName(prefix, instance.os, instance.name)
2865 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2866 # support for receiving a file descriptor for output
2867 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2870 # The import/export name is simply the status directory name
2871 return os.path.basename(status_dir)
2874 shutil.rmtree(status_dir, ignore_errors=True)
2878 def GetImportExportStatus(names):
2879 """Returns import/export daemon status.
2881 @type names: sequence
2882 @param names: List of names
2883 @rtype: List of dicts
2884 @return: Returns a list of the state of each named import/export or None if a
2885 status couldn't be read
2891 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2895 data = utils.ReadFile(status_file)
2896 except EnvironmentError, err:
2897 if err.errno != errno.ENOENT:
2905 result.append(serializer.LoadJson(data))
2910 def AbortImportExport(name):
2911 """Sends SIGTERM to a running import/export daemon.
2914 logging.info("Abort import/export %s", name)
2916 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2917 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2920 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2922 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2925 def CleanupImportExport(name):
2926 """Cleanup after an import or export.
2928 If the import/export daemon is still running it's killed. Afterwards the
2929 whole status directory is removed.
2932 logging.info("Finalizing import/export %s", name)
2934 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2936 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2939 logging.info("Import/export %s is still running with PID %s",
2941 utils.KillProcess(pid, waitpid=False)
2943 shutil.rmtree(status_dir, ignore_errors=True)
2946 def _FindDisks(nodes_ip, disks):
2947 """Sets the physical ID on disks and returns the block devices.
2950 # set the correct physical ID
2951 my_name = netutils.Hostname.GetSysName()
2953 cf.SetPhysicalID(my_name, nodes_ip)
2958 rd = _RecursiveFindBD(cf)
2960 _Fail("Can't find device %s", cf)
2965 def DrbdDisconnectNet(nodes_ip, disks):
2966 """Disconnects the network on a list of drbd devices.
2969 bdevs = _FindDisks(nodes_ip, disks)
2975 except errors.BlockDeviceError, err:
2976 _Fail("Can't change network configuration to standalone mode: %s",
2980 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2981 """Attaches the network on a list of drbd devices.
2984 bdevs = _FindDisks(nodes_ip, disks)
2987 for idx, rd in enumerate(bdevs):
2989 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2990 except EnvironmentError, err:
2991 _Fail("Can't create symlink: %s", err)
2992 # reconnect disks, switch to new master configuration and if
2993 # needed primary mode
2996 rd.AttachNet(multimaster)
2997 except errors.BlockDeviceError, err:
2998 _Fail("Can't change network configuration: %s", err)
3000 # wait until the disks are connected; we need to retry the re-attach
3001 # if the device becomes standalone, as this might happen if the one
3002 # node disconnects and reconnects in a different mode before the
3003 # other node reconnects; in this case, one or both of the nodes will
3004 # decide it has wrong configuration and switch to standalone
3007 all_connected = True
3010 stats = rd.GetProcStatus()
3012 all_connected = (all_connected and
3013 (stats.is_connected or stats.is_in_resync))
3015 if stats.is_standalone:
3016 # peer had different config info and this node became
3017 # standalone, even though this should not happen with the
3018 # new staged way of changing disk configs
3020 rd.AttachNet(multimaster)
3021 except errors.BlockDeviceError, err:
3022 _Fail("Can't change network configuration: %s", err)
3024 if not all_connected:
3025 raise utils.RetryAgain()
3028 # Start with a delay of 100 miliseconds and go up to 5 seconds
3029 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3030 except utils.RetryTimeout:
3031 _Fail("Timeout in disk reconnecting")
3034 # change to primary mode
3038 except errors.BlockDeviceError, err:
3039 _Fail("Can't change to primary mode: %s", err)
3042 def DrbdWaitSync(nodes_ip, disks):
3043 """Wait until DRBDs have synchronized.
3047 stats = rd.GetProcStatus()
3048 if not (stats.is_connected or stats.is_in_resync):
3049 raise utils.RetryAgain()
3052 bdevs = _FindDisks(nodes_ip, disks)
3058 # poll each second for 15 seconds
3059 stats = utils.Retry(_helper, 1, 15, args=[rd])
3060 except utils.RetryTimeout:
3061 stats = rd.GetProcStatus()
3063 if not (stats.is_connected or stats.is_in_resync):
3064 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3065 alldone = alldone and (not stats.is_in_resync)
3066 if stats.sync_percent is not None:
3067 min_resync = min(min_resync, stats.sync_percent)
3069 return (alldone, min_resync)
3072 def GetDrbdUsermodeHelper():
3073 """Returns DRBD usermode helper currently configured.
3077 return bdev.BaseDRBD.GetUsermodeHelper()
3078 except errors.BlockDeviceError, err:
3082 def PowercycleNode(hypervisor_type):
3083 """Hard-powercycle the node.
3085 Because we need to return first, and schedule the powercycle in the
3086 background, we won't be able to report failures nicely.
3089 hyper = hypervisor.GetHypervisor(hypervisor_type)
3093 # if we can't fork, we'll pretend that we're in the child process
3096 return "Reboot scheduled in 5 seconds"
3097 # ensure the child is running on ram
3100 except Exception: # pylint: disable-msg=W0703
3103 hyper.PowercycleNode()
3106 class HooksRunner(object):
3109 This class is instantiated on the node side (ganeti-noded) and not
3113 def __init__(self, hooks_base_dir=None):
3114 """Constructor for hooks runner.
3116 @type hooks_base_dir: str or None
3117 @param hooks_base_dir: if not None, this overrides the
3118 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3121 if hooks_base_dir is None:
3122 hooks_base_dir = constants.HOOKS_BASE_DIR
3123 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3125 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3127 def RunHooks(self, hpath, phase, env):
3128 """Run the scripts in the hooks directory.
3131 @param hpath: the path to the hooks directory which
3134 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3135 L{constants.HOOKS_PHASE_POST}
3137 @param env: dictionary with the environment for the hook
3139 @return: list of 3-element tuples:
3141 - script result, either L{constants.HKR_SUCCESS} or
3142 L{constants.HKR_FAIL}
3143 - output of the script
3145 @raise errors.ProgrammerError: for invalid input
3149 if phase == constants.HOOKS_PHASE_PRE:
3151 elif phase == constants.HOOKS_PHASE_POST:
3154 _Fail("Unknown hooks phase '%s'", phase)
3157 subdir = "%s-%s.d" % (hpath, suffix)
3158 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3162 if not os.path.isdir(dir_name):
3163 # for non-existing/non-dirs, we simply exit instead of logging a
3164 # warning at every operation
3167 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3169 for (relname, relstatus, runresult) in runparts_results:
3170 if relstatus == constants.RUNPARTS_SKIP:
3171 rrval = constants.HKR_SKIP
3173 elif relstatus == constants.RUNPARTS_ERR:
3174 rrval = constants.HKR_FAIL
3175 output = "Hook script execution error: %s" % runresult
3176 elif relstatus == constants.RUNPARTS_RUN:
3177 if runresult.failed:
3178 rrval = constants.HKR_FAIL
3180 rrval = constants.HKR_SUCCESS
3181 output = utils.SafeEncode(runresult.output.strip())
3182 results.append(("%s/%s" % (subdir, relname), rrval, output))
3187 class IAllocatorRunner(object):
3188 """IAllocator runner.
3190 This class is instantiated on the node side (ganeti-noded) and not on
3195 def Run(name, idata):
3196 """Run an iallocator script.
3199 @param name: the iallocator script name
3201 @param idata: the allocator input data
3204 @return: two element tuple of:
3206 - either error message or stdout of allocator (for success)
3209 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3211 if alloc_script is None:
3212 _Fail("iallocator module '%s' not found in the search path", name)
3214 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3218 result = utils.RunCmd([alloc_script, fin_name])
3220 _Fail("iallocator module '%s' failed: %s, output '%s'",
3221 name, result.fail_reason, result.output)
3225 return result.stdout
3228 class DevCacheManager(object):
3229 """Simple class for managing a cache of block device information.
3232 _DEV_PREFIX = "/dev/"
3233 _ROOT_DIR = constants.BDEV_CACHE_DIR
3236 def _ConvertPath(cls, dev_path):
3237 """Converts a /dev/name path to the cache file name.
3239 This replaces slashes with underscores and strips the /dev
3240 prefix. It then returns the full path to the cache file.
3243 @param dev_path: the C{/dev/} path name
3245 @return: the converted path name
3248 if dev_path.startswith(cls._DEV_PREFIX):
3249 dev_path = dev_path[len(cls._DEV_PREFIX):]
3250 dev_path = dev_path.replace("/", "_")
3251 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3255 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3256 """Updates the cache information for a given device.
3259 @param dev_path: the pathname of the device
3261 @param owner: the owner (instance name) of the device
3262 @type on_primary: bool
3263 @param on_primary: whether this is the primary
3266 @param iv_name: the instance-visible name of the
3267 device, as in objects.Disk.iv_name
3272 if dev_path is None:
3273 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3275 fpath = cls._ConvertPath(dev_path)
3281 iv_name = "not_visible"
3282 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3284 utils.WriteFile(fpath, data=fdata)
3285 except EnvironmentError, err:
3286 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3289 def RemoveCache(cls, dev_path):
3290 """Remove data for a dev_path.
3292 This is just a wrapper over L{utils.RemoveFile} with a converted
3293 path name and logging.
3296 @param dev_path: the pathname of the device
3301 if dev_path is None:
3302 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3304 fpath = cls._ConvertPath(dev_path)
3306 utils.RemoveFile(fpath)
3307 except EnvironmentError, err:
3308 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)