4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
64 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
65 _ALLOWED_CLEAN_DIRS = frozenset([
67 constants.JOB_QUEUE_ARCHIVE_DIR,
69 constants.CRYPTO_KEYS_DIR,
71 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
72 _X509_KEY_FILE = "key"
73 _X509_CERT_FILE = "cert"
74 _IES_STATUS_FILE = "status"
79 class RPCFail(Exception):
80 """Class denoting RPC failure.
82 Its argument is the error message.
87 def _Fail(msg, *args, **kwargs):
88 """Log an error and the raise an RPCFail exception.
90 This exception is then handled specially in the ganeti daemon and
91 turned into a 'failed' return type. As such, this function is a
92 useful shortcut for logging the error and returning it to the master
96 @param msg: the text of the exception
102 if "log" not in kwargs or kwargs["log"]: # if we should log this error
103 if "exc" in kwargs and kwargs["exc"]:
104 logging.exception(msg)
111 """Simple wrapper to return a SimpleStore.
113 @rtype: L{ssconf.SimpleStore}
114 @return: a SimpleStore instance
117 return ssconf.SimpleStore()
120 def _GetSshRunner(cluster_name):
121 """Simple wrapper to return an SshRunner.
123 @type cluster_name: str
124 @param cluster_name: the cluster name, which is needed
125 by the SshRunner constructor
126 @rtype: L{ssh.SshRunner}
127 @return: an SshRunner instance
130 return ssh.SshRunner(cluster_name)
133 def _Decompress(data):
134 """Unpacks data compressed by the RPC client.
136 @type data: list or tuple
137 @param data: Data sent by RPC client
139 @return: Decompressed data
142 assert isinstance(data, (list, tuple))
143 assert len(data) == 2
144 (encoding, content) = data
145 if encoding == constants.RPC_ENCODING_NONE:
147 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
148 return zlib.decompress(base64.b64decode(content))
150 raise AssertionError("Unknown data encoding")
153 def _CleanDirectory(path, exclude=None):
154 """Removes all regular files in a directory.
157 @param path: the directory to clean
159 @param exclude: list of files to be excluded, defaults
163 if path not in _ALLOWED_CLEAN_DIRS:
164 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
167 if not os.path.isdir(path):
172 # Normalize excluded paths
173 exclude = [os.path.normpath(i) for i in exclude]
175 for rel_name in utils.ListVisibleFiles(path):
176 full_name = utils.PathJoin(path, rel_name)
177 if full_name in exclude:
179 if os.path.isfile(full_name) and not os.path.islink(full_name):
180 utils.RemoveFile(full_name)
183 def _BuildUploadFileList():
184 """Build the list of allowed upload files.
186 This is abstracted so that it's built only once at module import time.
189 allowed_files = set([
190 constants.CLUSTER_CONF_FILE,
192 constants.SSH_KNOWN_HOSTS_FILE,
193 constants.VNC_PASSWORD_FILE,
194 constants.RAPI_CERT_FILE,
195 constants.RAPI_USERS_FILE,
196 constants.CONFD_HMAC_KEY,
197 constants.CLUSTER_DOMAIN_SECRET_FILE,
200 for hv_name in constants.HYPER_TYPES:
201 hv_class = hypervisor.GetHypervisorClass(hv_name)
202 allowed_files.update(hv_class.GetAncillaryFiles())
204 return frozenset(allowed_files)
207 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
211 """Removes job queue files and archived jobs.
217 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
218 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
222 """Returns master information.
224 This is an utility function to compute master information, either
225 for consumption here or from the node daemon.
228 @return: master_netdev, master_ip, master_name, primary_ip_family
229 @raise RPCFail: in case of errors
234 master_netdev = cfg.GetMasterNetdev()
235 master_ip = cfg.GetMasterIP()
236 master_node = cfg.GetMasterNode()
237 primary_ip_family = cfg.GetPrimaryIPFamily()
238 except errors.ConfigurationError, err:
239 _Fail("Cluster configuration incomplete: %s", err, exc=True)
240 return (master_netdev, master_ip, master_node, primary_ip_family)
243 def StartMaster(start_daemons, no_voting):
244 """Activate local node as master node.
246 The function will either try activate the IP address of the master
247 (unless someone else has it) or also start the master daemons, based
248 on the start_daemons parameter.
250 @type start_daemons: boolean
251 @param start_daemons: whether to start the master daemons
252 (ganeti-masterd and ganeti-rapi), or (if false) activate the
254 @type no_voting: boolean
255 @param no_voting: whether to start ganeti-masterd without a node vote
256 (if start_daemons is True), but still non-interactively
260 # GetMasterInfo will raise an exception if not able to return data
261 master_netdev, master_ip, _, family = GetMasterInfo()
264 # either start the master and rapi daemons
267 masterd_args = "--no-voting --yes-do-it"
272 "EXTRA_MASTERD_ARGS": masterd_args,
275 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
277 msg = "Can't start Ganeti master: %s" % result.output
282 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
283 if netutils.IPAddress.Own(master_ip):
284 # we already have the ip:
285 logging.debug("Master IP already configured, doing nothing")
287 msg = "Someone else has the master ip, not activating"
291 ipcls = netutils.IP4Address
292 if family == netutils.IP6Address.family:
293 ipcls = netutils.IP6Address
295 result = utils.RunCmd(["ip", "address", "add",
296 "%s/%d" % (master_ip, ipcls.iplen),
297 "dev", master_netdev, "label",
298 "%s:0" % master_netdev])
300 msg = "Can't activate master IP: %s" % result.output
304 # we ignore the exit code of the following cmds
305 if ipcls == netutils.IP4Address:
306 utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
307 master_ip, master_ip])
308 elif ipcls == netutils.IP6Address:
309 utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312 _Fail("; ".join(err_msgs))
315 def StopMaster(stop_daemons):
316 """Deactivate this node as master.
318 The function will always try to deactivate the IP address of the
319 master. It will also stop the master daemons depending on the
320 stop_daemons parameter.
322 @type stop_daemons: boolean
323 @param stop_daemons: whether to also stop the master daemons
324 (ganeti-masterd and ganeti-rapi)
328 # TODO: log and report back to the caller the error failures; we
329 # need to decide in which case we fail the RPC for this
331 # GetMasterInfo will raise an exception if not able to return data
332 master_netdev, master_ip, _, family = GetMasterInfo()
334 ipcls = netutils.IP4Address
335 if family == netutils.IP6Address.family:
336 ipcls = netutils.IP6Address
338 result = utils.RunCmd(["ip", "address", "del",
339 "%s/%d" % (master_ip, ipcls.iplen),
340 "dev", master_netdev])
342 logging.error("Can't remove the master IP, error: %s", result.output)
343 # but otherwise ignore the failure
346 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
348 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
350 result.cmd, result.exit_code, result.output)
353 def EtcHostsModify(mode, host, ip):
354 """Modify a host entry in /etc/hosts.
356 @param mode: The mode to operate. Either add or remove entry
357 @param host: The host to operate on
358 @param ip: The ip associated with the entry
361 if mode == constants.ETC_HOSTS_ADD:
363 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
365 utils.AddHostToEtcHosts(host, ip)
366 elif mode == constants.ETC_HOSTS_REMOVE:
368 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
369 " parameter is present")
370 utils.RemoveHostFromEtcHosts(host)
372 RPCFail("Mode not supported")
375 def LeaveCluster(modify_ssh_setup):
376 """Cleans up and remove the current node.
378 This function cleans up and prepares the current node to be removed
381 If processing is successful, then it raises an
382 L{errors.QuitGanetiException} which is used as a special case to
383 shutdown the node daemon.
385 @param modify_ssh_setup: boolean
388 _CleanDirectory(constants.DATA_DIR)
389 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
394 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
396 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
398 utils.RemoveFile(priv_key)
399 utils.RemoveFile(pub_key)
400 except errors.OpExecError:
401 logging.exception("Error while processing ssh files")
404 utils.RemoveFile(constants.CONFD_HMAC_KEY)
405 utils.RemoveFile(constants.RAPI_CERT_FILE)
406 utils.RemoveFile(constants.NODED_CERT_FILE)
407 except: # pylint: disable-msg=W0702
408 logging.exception("Error while removing cluster secrets")
410 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
412 logging.error("Command %s failed with exitcode %s and error %s",
413 result.cmd, result.exit_code, result.output)
415 # Raise a custom exception (handled in ganeti-noded)
416 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
419 def GetNodeInfo(vgname, hypervisor_type):
420 """Gives back a hash with different information about the node.
422 @type vgname: C{string}
423 @param vgname: the name of the volume group to ask for disk space information
424 @type hypervisor_type: C{str}
425 @param hypervisor_type: the name of the hypervisor to ask for
428 @return: dictionary with the following keys:
429 - vg_size is the size of the configured volume group in MiB
430 - vg_free is the free size of the volume group in MiB
431 - memory_dom0 is the memory allocated for domain0 in MiB
432 - memory_free is the currently available (free) ram in MiB
433 - memory_total is the total number of ram in MiB
437 vginfo = _GetVGInfo(vgname)
438 outputarray['vg_size'] = vginfo['vg_size']
439 outputarray['vg_free'] = vginfo['vg_free']
441 hyper = hypervisor.GetHypervisor(hypervisor_type)
442 hyp_info = hyper.GetNodeInfo()
443 if hyp_info is not None:
444 outputarray.update(hyp_info)
446 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
451 def VerifyNode(what, cluster_name):
452 """Verify the status of the local node.
454 Based on the input L{what} parameter, various checks are done on the
457 If the I{filelist} key is present, this list of
458 files is checksummed and the file/checksum pairs are returned.
460 If the I{nodelist} key is present, we check that we have
461 connectivity via ssh with the target nodes (and check the hostname
464 If the I{node-net-test} key is present, we check that we have
465 connectivity to the given nodes via both primary IP and, if
466 applicable, secondary IPs.
469 @param what: a dictionary of things to check:
470 - filelist: list of files for which to compute checksums
471 - nodelist: list of nodes we should check ssh communication with
472 - node-net-test: list of nodes we should check node daemon port
474 - hypervisor: list with hypervisors to run the verify for
476 @return: a dictionary with the same keys as the input dict, and
477 values representing the result of the checks
481 my_name = netutils.Hostname.GetSysName()
482 port = netutils.GetDaemonPort(constants.NODED)
484 if constants.NV_HYPERVISOR in what:
485 result[constants.NV_HYPERVISOR] = tmp = {}
486 for hv_name in what[constants.NV_HYPERVISOR]:
488 val = hypervisor.GetHypervisor(hv_name).Verify()
489 except errors.HypervisorError, err:
490 val = "Error while checking hypervisor: %s" % str(err)
493 if constants.NV_FILELIST in what:
494 result[constants.NV_FILELIST] = utils.FingerprintFiles(
495 what[constants.NV_FILELIST])
497 if constants.NV_NODELIST in what:
498 result[constants.NV_NODELIST] = tmp = {}
499 random.shuffle(what[constants.NV_NODELIST])
500 for node in what[constants.NV_NODELIST]:
501 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
505 if constants.NV_NODENETTEST in what:
506 result[constants.NV_NODENETTEST] = tmp = {}
507 my_pip = my_sip = None
508 for name, pip, sip in what[constants.NV_NODENETTEST]:
514 tmp[my_name] = ("Can't find my own primary/secondary IP"
517 for name, pip, sip in what[constants.NV_NODENETTEST]:
519 if not netutils.TcpPing(pip, port, source=my_pip):
520 fail.append("primary")
522 if not netutils.TcpPing(sip, port, source=my_sip):
523 fail.append("secondary")
525 tmp[name] = ("failure using the %s interface(s)" %
528 if constants.NV_MASTERIP in what:
529 # FIXME: add checks on incoming data structures (here and in the
530 # rest of the function)
531 master_name, master_ip = what[constants.NV_MASTERIP]
532 if master_name == my_name:
533 source = constants.IP4_ADDRESS_LOCALHOST
536 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
539 if constants.NV_LVLIST in what:
541 val = GetVolumeList(what[constants.NV_LVLIST])
544 result[constants.NV_LVLIST] = val
546 if constants.NV_INSTANCELIST in what:
547 # GetInstanceList can fail
549 val = GetInstanceList(what[constants.NV_INSTANCELIST])
552 result[constants.NV_INSTANCELIST] = val
554 if constants.NV_VGLIST in what:
555 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
557 if constants.NV_PVLIST in what:
558 result[constants.NV_PVLIST] = \
559 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
560 filter_allocatable=False)
562 if constants.NV_VERSION in what:
563 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
564 constants.RELEASE_VERSION)
566 if constants.NV_HVINFO in what:
567 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
568 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
570 if constants.NV_DRBDLIST in what:
572 used_minors = bdev.DRBD8.GetUsedDevs().keys()
573 except errors.BlockDeviceError, err:
574 logging.warning("Can't get used minors list", exc_info=True)
575 used_minors = str(err)
576 result[constants.NV_DRBDLIST] = used_minors
578 if constants.NV_DRBDHELPER in what:
581 payload = bdev.BaseDRBD.GetUsermodeHelper()
582 except errors.BlockDeviceError, err:
583 logging.error("Can't get DRBD usermode helper: %s", str(err))
586 result[constants.NV_DRBDHELPER] = (status, payload)
588 if constants.NV_NODESETUP in what:
589 result[constants.NV_NODESETUP] = tmpr = []
590 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
591 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
592 " under /sys, missing required directories /sys/block"
593 " and /sys/class/net")
594 if (not os.path.isdir("/proc/sys") or
595 not os.path.isfile("/proc/sysrq-trigger")):
596 tmpr.append("The procfs filesystem doesn't seem to be mounted"
597 " under /proc, missing required directory /proc/sys and"
598 " the file /proc/sysrq-trigger")
600 if constants.NV_TIME in what:
601 result[constants.NV_TIME] = utils.SplitTime(time.time())
603 if constants.NV_OSLIST in what:
604 result[constants.NV_OSLIST] = DiagnoseOS()
609 def GetVolumeList(vg_name):
610 """Compute list of logical volumes and their size.
613 @param vg_name: the volume group whose LVs we should list
616 dictionary of all partions (key) with value being a tuple of
617 their size (in MiB), inactive and online status::
619 {'test1': ('20.06', True, True)}
621 in case of errors, a string is returned with the error
627 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
628 "--separator=%s" % sep,
629 "-olv_name,lv_size,lv_attr", vg_name])
631 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
633 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
634 for line in result.stdout.splitlines():
636 match = valid_line_re.match(line)
638 logging.error("Invalid line returned from lvs output: '%s'", line)
640 name, size, attr = match.groups()
641 inactive = attr[4] == '-'
642 online = attr[5] == 'o'
643 virtual = attr[0] == 'v'
645 # we don't want to report such volumes as existing, since they
646 # don't really hold data
648 lvs[name] = (size, inactive, online)
653 def ListVolumeGroups():
654 """List the volume groups and their size.
657 @return: dictionary with keys volume name and values the
661 return utils.ListVolumeGroups()
665 """List all volumes on this node.
669 A list of dictionaries, each having four keys:
670 - name: the logical volume name,
671 - size: the size of the logical volume
672 - dev: the physical device on which the LV lives
673 - vg: the volume group to which it belongs
675 In case of errors, we return an empty list and log the
678 Note that since a logical volume can live on multiple physical
679 volumes, the resulting list might include a logical volume
683 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
685 "--options=lv_name,lv_size,devices,vg_name"])
687 _Fail("Failed to list logical volumes, lvs output: %s",
691 return dev.split('(')[0]
694 return [parse_dev(x) for x in dev.split(",")]
697 line = [v.strip() for v in line]
698 return [{'name': line[0], 'size': line[1],
699 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
702 for line in result.stdout.splitlines():
703 if line.count('|') >= 3:
704 all_devs.extend(map_line(line.split('|')))
706 logging.warning("Strange line in the output from lvs: '%s'", line)
710 def BridgesExist(bridges_list):
711 """Check if a list of bridges exist on the current node.
714 @return: C{True} if all of them exist, C{False} otherwise
718 for bridge in bridges_list:
719 if not utils.BridgeExists(bridge):
720 missing.append(bridge)
723 _Fail("Missing bridges %s", utils.CommaJoin(missing))
726 def GetInstanceList(hypervisor_list):
727 """Provides a list of instances.
729 @type hypervisor_list: list
730 @param hypervisor_list: the list of hypervisors to query information
733 @return: a list of all running instances on the current node
734 - instance1.example.com
735 - instance2.example.com
739 for hname in hypervisor_list:
741 names = hypervisor.GetHypervisor(hname).ListInstances()
742 results.extend(names)
743 except errors.HypervisorError, err:
744 _Fail("Error enumerating instances (hypervisor %s): %s",
745 hname, err, exc=True)
750 def GetInstanceInfo(instance, hname):
751 """Gives back the information about an instance as a dictionary.
753 @type instance: string
754 @param instance: the instance name
756 @param hname: the hypervisor type of the instance
759 @return: dictionary with the following keys:
760 - memory: memory size of instance (int)
761 - state: xen state of instance (string)
762 - time: cpu time of instance (float)
767 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
768 if iinfo is not None:
769 output['memory'] = iinfo[2]
770 output['state'] = iinfo[4]
771 output['time'] = iinfo[5]
776 def GetInstanceMigratable(instance):
777 """Gives whether an instance can be migrated.
779 @type instance: L{objects.Instance}
780 @param instance: object representing the instance to be checked.
783 @return: tuple of (result, description) where:
784 - result: whether the instance can be migrated or not
785 - description: a description of the issue, if relevant
788 hyper = hypervisor.GetHypervisor(instance.hypervisor)
789 iname = instance.name
790 if iname not in hyper.ListInstances():
791 _Fail("Instance %s is not running", iname)
793 for idx in range(len(instance.disks)):
794 link_name = _GetBlockDevSymlinkPath(iname, idx)
795 if not os.path.islink(link_name):
796 logging.warning("Instance %s is missing symlink %s for disk %d",
797 iname, link_name, idx)
800 def GetAllInstancesInfo(hypervisor_list):
801 """Gather data about all instances.
803 This is the equivalent of L{GetInstanceInfo}, except that it
804 computes data for all instances at once, thus being faster if one
805 needs data about more than one instance.
807 @type hypervisor_list: list
808 @param hypervisor_list: list of hypervisors to query for instance data
811 @return: dictionary of instance: data, with data having the following keys:
812 - memory: memory size of instance (int)
813 - state: xen state of instance (string)
814 - time: cpu time of instance (float)
815 - vcpus: the number of vcpus
820 for hname in hypervisor_list:
821 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
823 for name, _, memory, vcpus, state, times in iinfo:
831 # we only check static parameters, like memory and vcpus,
832 # and not state and time which can change between the
833 # invocations of the different hypervisors
834 for key in 'memory', 'vcpus':
835 if value[key] != output[name][key]:
836 _Fail("Instance %s is running twice"
837 " with different parameters", name)
843 def _InstanceLogName(kind, os_name, instance):
844 """Compute the OS log filename for a given instance and operation.
846 The instance name and os name are passed in as strings since not all
847 operations have these as part of an instance object.
850 @param kind: the operation type (e.g. add, import, etc.)
851 @type os_name: string
852 @param os_name: the os name
853 @type instance: string
854 @param instance: the name of the instance being imported/added/etc.
857 # TODO: Use tempfile.mkstemp to create unique filename
858 base = ("%s-%s-%s-%s.log" %
859 (kind, os_name, instance, utils.TimestampForFilename()))
860 return utils.PathJoin(constants.LOG_OS_DIR, base)
863 def InstanceOsAdd(instance, reinstall, debug):
864 """Add an OS to an instance.
866 @type instance: L{objects.Instance}
867 @param instance: Instance whose OS is to be installed
868 @type reinstall: boolean
869 @param reinstall: whether this is an instance reinstall
871 @param debug: debug level, passed to the OS scripts
875 inst_os = OSFromDisk(instance.os)
877 create_env = OSEnvironment(instance, inst_os, debug)
879 create_env['INSTANCE_REINSTALL'] = "1"
881 logfile = _InstanceLogName("add", instance.os, instance.name)
883 result = utils.RunCmd([inst_os.create_script], env=create_env,
884 cwd=inst_os.path, output=logfile,)
886 logging.error("os create command '%s' returned error: %s, logfile: %s,"
887 " output: %s", result.cmd, result.fail_reason, logfile,
889 lines = [utils.SafeEncode(val)
890 for val in utils.TailFile(logfile, lines=20)]
891 _Fail("OS create script failed (%s), last lines in the"
892 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
895 def RunRenameInstance(instance, old_name, debug):
896 """Run the OS rename script for an instance.
898 @type instance: L{objects.Instance}
899 @param instance: Instance whose OS is to be installed
900 @type old_name: string
901 @param old_name: previous instance name
903 @param debug: debug level, passed to the OS scripts
905 @return: the success of the operation
908 inst_os = OSFromDisk(instance.os)
910 rename_env = OSEnvironment(instance, inst_os, debug)
911 rename_env['OLD_INSTANCE_NAME'] = old_name
913 logfile = _InstanceLogName("rename", instance.os,
914 "%s-%s" % (old_name, instance.name))
916 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
917 cwd=inst_os.path, output=logfile)
920 logging.error("os create command '%s' returned error: %s output: %s",
921 result.cmd, result.fail_reason, result.output)
922 lines = [utils.SafeEncode(val)
923 for val in utils.TailFile(logfile, lines=20)]
924 _Fail("OS rename script failed (%s), last lines in the"
925 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
928 def _GetVGInfo(vg_name):
929 """Get information about the volume group.
932 @param vg_name: the volume group which we query
935 A dictionary with the following keys:
936 - C{vg_size} is the total size of the volume group in MiB
937 - C{vg_free} is the free size of the volume group in MiB
938 - C{pv_count} are the number of physical disks in that VG
940 If an error occurs during gathering of data, we return the same dict
941 with keys all set to None.
944 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
946 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
947 "--nosuffix", "--units=m", "--separator=:", vg_name])
950 logging.error("volume group %s not present", vg_name)
952 valarr = retval.stdout.strip().rstrip(':').split(':')
956 "vg_size": int(round(float(valarr[0]), 0)),
957 "vg_free": int(round(float(valarr[1]), 0)),
958 "pv_count": int(valarr[2]),
960 except (TypeError, ValueError), err:
961 logging.exception("Fail to parse vgs output: %s", err)
963 logging.error("vgs output has the wrong number of fields (expected"
964 " three): %s", str(valarr))
968 def _GetBlockDevSymlinkPath(instance_name, idx):
969 return utils.PathJoin(constants.DISK_LINKS_DIR,
970 "%s:%d" % (instance_name, idx))
973 def _SymlinkBlockDev(instance_name, device_path, idx):
974 """Set up symlinks to a instance's block device.
976 This is an auxiliary function run when an instance is start (on the primary
977 node) or when an instance is migrated (on the target node).
980 @param instance_name: the name of the target instance
981 @param device_path: path of the physical block device, on the node
982 @param idx: the disk index
983 @return: absolute path to the disk's symlink
986 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
988 os.symlink(device_path, link_name)
990 if err.errno == errno.EEXIST:
991 if (not os.path.islink(link_name) or
992 os.readlink(link_name) != device_path):
994 os.symlink(device_path, link_name)
1001 def _RemoveBlockDevLinks(instance_name, disks):
1002 """Remove the block device symlinks belonging to the given instance.
1005 for idx, _ in enumerate(disks):
1006 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1007 if os.path.islink(link_name):
1009 os.remove(link_name)
1011 logging.exception("Can't remove symlink '%s'", link_name)
1014 def _GatherAndLinkBlockDevs(instance):
1015 """Set up an instance's block device(s).
1017 This is run on the primary node at instance startup. The block
1018 devices must be already assembled.
1020 @type instance: L{objects.Instance}
1021 @param instance: the instance whose disks we shoul assemble
1023 @return: list of (disk_object, device_path)
1027 for idx, disk in enumerate(instance.disks):
1028 device = _RecursiveFindBD(disk)
1030 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1034 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1036 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1039 block_devices.append((disk, link_name))
1041 return block_devices
1044 def StartInstance(instance):
1045 """Start an instance.
1047 @type instance: L{objects.Instance}
1048 @param instance: the instance object
1052 running_instances = GetInstanceList([instance.hypervisor])
1054 if instance.name in running_instances:
1055 logging.info("Instance %s already running, not starting", instance.name)
1059 block_devices = _GatherAndLinkBlockDevs(instance)
1060 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1061 hyper.StartInstance(instance, block_devices)
1062 except errors.BlockDeviceError, err:
1063 _Fail("Block device error: %s", err, exc=True)
1064 except errors.HypervisorError, err:
1065 _RemoveBlockDevLinks(instance.name, instance.disks)
1066 _Fail("Hypervisor error: %s", err, exc=True)
1069 def InstanceShutdown(instance, timeout):
1070 """Shut an instance down.
1072 @note: this functions uses polling with a hardcoded timeout.
1074 @type instance: L{objects.Instance}
1075 @param instance: the instance object
1076 @type timeout: integer
1077 @param timeout: maximum timeout for soft shutdown
1081 hv_name = instance.hypervisor
1082 hyper = hypervisor.GetHypervisor(hv_name)
1083 iname = instance.name
1085 if instance.name not in hyper.ListInstances():
1086 logging.info("Instance %s not running, doing nothing", iname)
1091 self.tried_once = False
1094 if iname not in hyper.ListInstances():
1098 hyper.StopInstance(instance, retry=self.tried_once)
1099 except errors.HypervisorError, err:
1100 if iname not in hyper.ListInstances():
1101 # if the instance is no longer existing, consider this a
1102 # success and go to cleanup
1105 _Fail("Failed to stop instance %s: %s", iname, err)
1107 self.tried_once = True
1109 raise utils.RetryAgain()
1112 utils.Retry(_TryShutdown(), 5, timeout)
1113 except utils.RetryTimeout:
1114 # the shutdown did not succeed
1115 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1118 hyper.StopInstance(instance, force=True)
1119 except errors.HypervisorError, err:
1120 if iname in hyper.ListInstances():
1121 # only raise an error if the instance still exists, otherwise
1122 # the error could simply be "instance ... unknown"!
1123 _Fail("Failed to force stop instance %s: %s", iname, err)
1127 if iname in hyper.ListInstances():
1128 _Fail("Could not shutdown instance %s even by destroy", iname)
1131 hyper.CleanupInstance(instance.name)
1132 except errors.HypervisorError, err:
1133 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1135 _RemoveBlockDevLinks(iname, instance.disks)
1138 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1139 """Reboot an instance.
1141 @type instance: L{objects.Instance}
1142 @param instance: the instance object to reboot
1143 @type reboot_type: str
1144 @param reboot_type: the type of reboot, one the following
1146 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1147 instance OS, do not recreate the VM
1148 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1149 restart the VM (at the hypervisor level)
1150 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1151 not accepted here, since that mode is handled differently, in
1152 cmdlib, and translates into full stop and start of the
1153 instance (instead of a call_instance_reboot RPC)
1154 @type shutdown_timeout: integer
1155 @param shutdown_timeout: maximum timeout for soft shutdown
1159 running_instances = GetInstanceList([instance.hypervisor])
1161 if instance.name not in running_instances:
1162 _Fail("Cannot reboot instance %s that is not running", instance.name)
1164 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1165 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1167 hyper.RebootInstance(instance)
1168 except errors.HypervisorError, err:
1169 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1170 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1172 InstanceShutdown(instance, shutdown_timeout)
1173 return StartInstance(instance)
1174 except errors.HypervisorError, err:
1175 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1177 _Fail("Invalid reboot_type received: %s", reboot_type)
1180 def MigrationInfo(instance):
1181 """Gather information about an instance to be migrated.
1183 @type instance: L{objects.Instance}
1184 @param instance: the instance definition
1187 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1189 info = hyper.MigrationInfo(instance)
1190 except errors.HypervisorError, err:
1191 _Fail("Failed to fetch migration information: %s", err, exc=True)
1195 def AcceptInstance(instance, info, target):
1196 """Prepare the node to accept an instance.
1198 @type instance: L{objects.Instance}
1199 @param instance: the instance definition
1200 @type info: string/data (opaque)
1201 @param info: migration information, from the source node
1202 @type target: string
1203 @param target: target host (usually ip), on this node
1206 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1208 hyper.AcceptInstance(instance, info, target)
1209 except errors.HypervisorError, err:
1210 _Fail("Failed to accept instance: %s", err, exc=True)
1213 def FinalizeMigration(instance, info, success):
1214 """Finalize any preparation to accept an instance.
1216 @type instance: L{objects.Instance}
1217 @param instance: the instance definition
1218 @type info: string/data (opaque)
1219 @param info: migration information, from the source node
1220 @type success: boolean
1221 @param success: whether the migration was a success or a failure
1224 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1226 hyper.FinalizeMigration(instance, info, success)
1227 except errors.HypervisorError, err:
1228 _Fail("Failed to finalize migration: %s", err, exc=True)
1231 def MigrateInstance(instance, target, live):
1232 """Migrates an instance to another node.
1234 @type instance: L{objects.Instance}
1235 @param instance: the instance definition
1236 @type target: string
1237 @param target: the target node name
1239 @param live: whether the migration should be done live or not (the
1240 interpretation of this parameter is left to the hypervisor)
1242 @return: a tuple of (success, msg) where:
1243 - succes is a boolean denoting the success/failure of the operation
1244 - msg is a string with details in case of failure
1247 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1250 hyper.MigrateInstance(instance, target, live)
1251 except errors.HypervisorError, err:
1252 _Fail("Failed to migrate instance: %s", err, exc=True)
1255 def BlockdevCreate(disk, size, owner, on_primary, info):
1256 """Creates a block device for an instance.
1258 @type disk: L{objects.Disk}
1259 @param disk: the object describing the disk we should create
1261 @param size: the size of the physical underlying device, in MiB
1263 @param owner: the name of the instance for which disk is created,
1264 used for device cache data
1265 @type on_primary: boolean
1266 @param on_primary: indicates if it is the primary node or not
1268 @param info: string that will be sent to the physical device
1269 creation, used for example to set (LVM) tags on LVs
1271 @return: the new unique_id of the device (this can sometime be
1272 computed only after creation), or None. On secondary nodes,
1273 it's not required to return anything.
1276 # TODO: remove the obsolete 'size' argument
1277 # pylint: disable-msg=W0613
1280 for child in disk.children:
1282 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1283 except errors.BlockDeviceError, err:
1284 _Fail("Can't assemble device %s: %s", child, err)
1285 if on_primary or disk.AssembleOnSecondary():
1286 # we need the children open in case the device itself has to
1289 # pylint: disable-msg=E1103
1291 except errors.BlockDeviceError, err:
1292 _Fail("Can't make child '%s' read-write: %s", child, err)
1296 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1297 except errors.BlockDeviceError, err:
1298 _Fail("Can't create block device: %s", err)
1300 if on_primary or disk.AssembleOnSecondary():
1303 except errors.BlockDeviceError, err:
1304 _Fail("Can't assemble device after creation, unusual event: %s", err)
1305 device.SetSyncSpeed(constants.SYNC_SPEED)
1306 if on_primary or disk.OpenOnSecondary():
1308 device.Open(force=True)
1309 except errors.BlockDeviceError, err:
1310 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1311 DevCacheManager.UpdateCache(device.dev_path, owner,
1312 on_primary, disk.iv_name)
1314 device.SetInfo(info)
1316 return device.unique_id
1319 def BlockdevRemove(disk):
1320 """Remove a block device.
1322 @note: This is intended to be called recursively.
1324 @type disk: L{objects.Disk}
1325 @param disk: the disk object we should remove
1327 @return: the success of the operation
1332 rdev = _RecursiveFindBD(disk)
1333 except errors.BlockDeviceError, err:
1334 # probably can't attach
1335 logging.info("Can't attach to device %s in remove", disk)
1337 if rdev is not None:
1338 r_path = rdev.dev_path
1341 except errors.BlockDeviceError, err:
1342 msgs.append(str(err))
1344 DevCacheManager.RemoveCache(r_path)
1347 for child in disk.children:
1349 BlockdevRemove(child)
1350 except RPCFail, err:
1351 msgs.append(str(err))
1354 _Fail("; ".join(msgs))
1357 def _RecursiveAssembleBD(disk, owner, as_primary):
1358 """Activate a block device for an instance.
1360 This is run on the primary and secondary nodes for an instance.
1362 @note: this function is called recursively.
1364 @type disk: L{objects.Disk}
1365 @param disk: the disk we try to assemble
1367 @param owner: the name of the instance which owns the disk
1368 @type as_primary: boolean
1369 @param as_primary: if we should make the block device
1372 @return: the assembled device or None (in case no device
1374 @raise errors.BlockDeviceError: in case there is an error
1375 during the activation of the children or the device
1381 mcn = disk.ChildrenNeeded()
1383 mcn = 0 # max number of Nones allowed
1385 mcn = len(disk.children) - mcn # max number of Nones
1386 for chld_disk in disk.children:
1388 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1389 except errors.BlockDeviceError, err:
1390 if children.count(None) >= mcn:
1393 logging.error("Error in child activation (but continuing): %s",
1395 children.append(cdev)
1397 if as_primary or disk.AssembleOnSecondary():
1398 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1399 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1401 if as_primary or disk.OpenOnSecondary():
1403 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1404 as_primary, disk.iv_name)
1411 def BlockdevAssemble(disk, owner, as_primary):
1412 """Activate a block device for an instance.
1414 This is a wrapper over _RecursiveAssembleBD.
1416 @rtype: str or boolean
1417 @return: a C{/dev/...} path for primary nodes, and
1418 C{True} for secondary nodes
1422 result = _RecursiveAssembleBD(disk, owner, as_primary)
1423 if isinstance(result, bdev.BlockDev):
1424 # pylint: disable-msg=E1103
1425 result = result.dev_path
1426 except errors.BlockDeviceError, err:
1427 _Fail("Error while assembling disk: %s", err, exc=True)
1432 def BlockdevShutdown(disk):
1433 """Shut down a block device.
1435 First, if the device is assembled (Attach() is successful), then
1436 the device is shutdown. Then the children of the device are
1439 This function is called recursively. Note that we don't cache the
1440 children or such, as oppossed to assemble, shutdown of different
1441 devices doesn't require that the upper device was active.
1443 @type disk: L{objects.Disk}
1444 @param disk: the description of the disk we should
1450 r_dev = _RecursiveFindBD(disk)
1451 if r_dev is not None:
1452 r_path = r_dev.dev_path
1455 DevCacheManager.RemoveCache(r_path)
1456 except errors.BlockDeviceError, err:
1457 msgs.append(str(err))
1460 for child in disk.children:
1462 BlockdevShutdown(child)
1463 except RPCFail, err:
1464 msgs.append(str(err))
1467 _Fail("; ".join(msgs))
1470 def BlockdevAddchildren(parent_cdev, new_cdevs):
1471 """Extend a mirrored block device.
1473 @type parent_cdev: L{objects.Disk}
1474 @param parent_cdev: the disk to which we should add children
1475 @type new_cdevs: list of L{objects.Disk}
1476 @param new_cdevs: the list of children which we should add
1480 parent_bdev = _RecursiveFindBD(parent_cdev)
1481 if parent_bdev is None:
1482 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1483 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1484 if new_bdevs.count(None) > 0:
1485 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1486 parent_bdev.AddChildren(new_bdevs)
1489 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1490 """Shrink a mirrored block device.
1492 @type parent_cdev: L{objects.Disk}
1493 @param parent_cdev: the disk from which we should remove children
1494 @type new_cdevs: list of L{objects.Disk}
1495 @param new_cdevs: the list of children which we should remove
1499 parent_bdev = _RecursiveFindBD(parent_cdev)
1500 if parent_bdev is None:
1501 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1503 for disk in new_cdevs:
1504 rpath = disk.StaticDevPath()
1506 bd = _RecursiveFindBD(disk)
1508 _Fail("Can't find device %s while removing children", disk)
1510 devs.append(bd.dev_path)
1512 if not utils.IsNormAbsPath(rpath):
1513 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1515 parent_bdev.RemoveChildren(devs)
1518 def BlockdevGetmirrorstatus(disks):
1519 """Get the mirroring status of a list of devices.
1521 @type disks: list of L{objects.Disk}
1522 @param disks: the list of disks which we should query
1525 a list of (mirror_done, estimated_time) tuples, which
1526 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1527 @raise errors.BlockDeviceError: if any of the disks cannot be
1533 rbd = _RecursiveFindBD(dsk)
1535 _Fail("Can't find device %s", dsk)
1537 stats.append(rbd.CombinedSyncStatus())
1542 def _RecursiveFindBD(disk):
1543 """Check if a device is activated.
1545 If so, return information about the real device.
1547 @type disk: L{objects.Disk}
1548 @param disk: the disk object we need to find
1550 @return: None if the device can't be found,
1551 otherwise the device instance
1556 for chdisk in disk.children:
1557 children.append(_RecursiveFindBD(chdisk))
1559 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1562 def _OpenRealBD(disk):
1563 """Opens the underlying block device of a disk.
1565 @type disk: L{objects.Disk}
1566 @param disk: the disk object we want to open
1569 real_disk = _RecursiveFindBD(disk)
1570 if real_disk is None:
1571 _Fail("Block device '%s' is not set up", disk)
1578 def BlockdevFind(disk):
1579 """Check if a device is activated.
1581 If it is, return information about the real device.
1583 @type disk: L{objects.Disk}
1584 @param disk: the disk to find
1585 @rtype: None or objects.BlockDevStatus
1586 @return: None if the disk cannot be found, otherwise a the current
1591 rbd = _RecursiveFindBD(disk)
1592 except errors.BlockDeviceError, err:
1593 _Fail("Failed to find device: %s", err, exc=True)
1598 return rbd.GetSyncStatus()
1601 def BlockdevGetsize(disks):
1602 """Computes the size of the given disks.
1604 If a disk is not found, returns None instead.
1606 @type disks: list of L{objects.Disk}
1607 @param disks: the list of disk to compute the size for
1609 @return: list with elements None if the disk cannot be found,
1616 rbd = _RecursiveFindBD(cf)
1617 except errors.BlockDeviceError:
1623 result.append(rbd.GetActualSize())
1627 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1628 """Export a block device to a remote node.
1630 @type disk: L{objects.Disk}
1631 @param disk: the description of the disk to export
1632 @type dest_node: str
1633 @param dest_node: the destination node to export to
1634 @type dest_path: str
1635 @param dest_path: the destination path on the target node
1636 @type cluster_name: str
1637 @param cluster_name: the cluster name, needed for SSH hostalias
1641 real_disk = _OpenRealBD(disk)
1643 # the block size on the read dd is 1MiB to match our units
1644 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1645 "dd if=%s bs=1048576 count=%s",
1646 real_disk.dev_path, str(disk.size))
1648 # we set here a smaller block size as, due to ssh buffering, more
1649 # than 64-128k will mostly ignored; we use nocreat to fail if the
1650 # device is not already there or we pass a wrong path; we use
1651 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1652 # to not buffer too much memory; this means that at best, we flush
1653 # every 64k, which will not be very fast
1654 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1655 " oflag=dsync", dest_path)
1657 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1658 constants.GANETI_RUNAS,
1661 # all commands have been checked, so we're safe to combine them
1662 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1664 result = utils.RunCmd(["bash", "-c", command])
1667 _Fail("Disk copy command '%s' returned error: %s"
1668 " output: %s", command, result.fail_reason, result.output)
1671 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1672 """Write a file to the filesystem.
1674 This allows the master to overwrite(!) a file. It will only perform
1675 the operation if the file belongs to a list of configuration files.
1677 @type file_name: str
1678 @param file_name: the target file name
1680 @param data: the new contents of the file
1682 @param mode: the mode to give the file (can be None)
1684 @param uid: the owner of the file (can be -1 for default)
1686 @param gid: the group of the file (can be -1 for default)
1688 @param atime: the atime to set on the file (can be None)
1690 @param mtime: the mtime to set on the file (can be None)
1694 if not os.path.isabs(file_name):
1695 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1697 if file_name not in _ALLOWED_UPLOAD_FILES:
1698 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1701 raw_data = _Decompress(data)
1703 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1704 atime=atime, mtime=mtime)
1707 def WriteSsconfFiles(values):
1708 """Update all ssconf files.
1710 Wrapper around the SimpleStore.WriteFiles.
1713 ssconf.SimpleStore().WriteFiles(values)
1716 def _ErrnoOrStr(err):
1717 """Format an EnvironmentError exception.
1719 If the L{err} argument has an errno attribute, it will be looked up
1720 and converted into a textual C{E...} description. Otherwise the
1721 string representation of the error will be returned.
1723 @type err: L{EnvironmentError}
1724 @param err: the exception to format
1727 if hasattr(err, 'errno'):
1728 detail = errno.errorcode[err.errno]
1734 def _OSOndiskAPIVersion(os_dir):
1735 """Compute and return the API version of a given OS.
1737 This function will try to read the API version of the OS residing in
1738 the 'os_dir' directory.
1741 @param os_dir: the directory in which we should look for the OS
1743 @return: tuple (status, data) with status denoting the validity and
1744 data holding either the vaid versions or an error message
1747 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1750 st = os.stat(api_file)
1751 except EnvironmentError, err:
1752 return False, ("Required file '%s' not found under path %s: %s" %
1753 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1755 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1756 return False, ("File '%s' in %s is not a regular file" %
1757 (constants.OS_API_FILE, os_dir))
1760 api_versions = utils.ReadFile(api_file).splitlines()
1761 except EnvironmentError, err:
1762 return False, ("Error while reading the API version file at %s: %s" %
1763 (api_file, _ErrnoOrStr(err)))
1766 api_versions = [int(version.strip()) for version in api_versions]
1767 except (TypeError, ValueError), err:
1768 return False, ("API version(s) can't be converted to integer: %s" %
1771 return True, api_versions
1774 def DiagnoseOS(top_dirs=None):
1775 """Compute the validity for all OSes.
1777 @type top_dirs: list
1778 @param top_dirs: the list of directories in which to
1779 search (if not given defaults to
1780 L{constants.OS_SEARCH_PATH})
1781 @rtype: list of L{objects.OS}
1782 @return: a list of tuples (name, path, status, diagnose, variants,
1783 parameters, api_version) for all (potential) OSes under all
1784 search paths, where:
1785 - name is the (potential) OS name
1786 - path is the full path to the OS
1787 - status True/False is the validity of the OS
1788 - diagnose is the error message for an invalid OS, otherwise empty
1789 - variants is a list of supported OS variants, if any
1790 - parameters is a list of (name, help) parameters, if any
1791 - api_version is a list of support OS API versions
1794 if top_dirs is None:
1795 top_dirs = constants.OS_SEARCH_PATH
1798 for dir_name in top_dirs:
1799 if os.path.isdir(dir_name):
1801 f_names = utils.ListVisibleFiles(dir_name)
1802 except EnvironmentError, err:
1803 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1805 for name in f_names:
1806 os_path = utils.PathJoin(dir_name, name)
1807 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1810 variants = os_inst.supported_variants
1811 parameters = os_inst.supported_parameters
1812 api_versions = os_inst.api_versions
1815 variants = parameters = api_versions = []
1816 result.append((name, os_path, status, diagnose, variants,
1817 parameters, api_versions))
1822 def _TryOSFromDisk(name, base_dir=None):
1823 """Create an OS instance from disk.
1825 This function will return an OS instance if the given name is a
1828 @type base_dir: string
1829 @keyword base_dir: Base directory containing OS installations.
1830 Defaults to a search in all the OS_SEARCH_PATH dirs.
1832 @return: success and either the OS instance if we find a valid one,
1836 if base_dir is None:
1837 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1839 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1842 return False, "Directory for OS %s not found in search path" % name
1844 status, api_versions = _OSOndiskAPIVersion(os_dir)
1847 return status, api_versions
1849 if not constants.OS_API_VERSIONS.intersection(api_versions):
1850 return False, ("API version mismatch for path '%s': found %s, want %s." %
1851 (os_dir, api_versions, constants.OS_API_VERSIONS))
1853 # OS Files dictionary, we will populate it with the absolute path names
1854 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1856 if max(api_versions) >= constants.OS_API_V15:
1857 os_files[constants.OS_VARIANTS_FILE] = ''
1859 if max(api_versions) >= constants.OS_API_V20:
1860 os_files[constants.OS_PARAMETERS_FILE] = ''
1862 del os_files[constants.OS_SCRIPT_VERIFY]
1864 for filename in os_files:
1865 os_files[filename] = utils.PathJoin(os_dir, filename)
1868 st = os.stat(os_files[filename])
1869 except EnvironmentError, err:
1870 return False, ("File '%s' under path '%s' is missing (%s)" %
1871 (filename, os_dir, _ErrnoOrStr(err)))
1873 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1874 return False, ("File '%s' under path '%s' is not a regular file" %
1877 if filename in constants.OS_SCRIPTS:
1878 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1879 return False, ("File '%s' under path '%s' is not executable" %
1883 if constants.OS_VARIANTS_FILE in os_files:
1884 variants_file = os_files[constants.OS_VARIANTS_FILE]
1886 variants = utils.ReadFile(variants_file).splitlines()
1887 except EnvironmentError, err:
1888 return False, ("Error while reading the OS variants file at %s: %s" %
1889 (variants_file, _ErrnoOrStr(err)))
1891 return False, ("No supported os variant found")
1894 if constants.OS_PARAMETERS_FILE in os_files:
1895 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1897 parameters = utils.ReadFile(parameters_file).splitlines()
1898 except EnvironmentError, err:
1899 return False, ("Error while reading the OS parameters file at %s: %s" %
1900 (parameters_file, _ErrnoOrStr(err)))
1901 parameters = [v.split(None, 1) for v in parameters]
1903 os_obj = objects.OS(name=name, path=os_dir,
1904 create_script=os_files[constants.OS_SCRIPT_CREATE],
1905 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1906 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1907 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1908 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1910 supported_variants=variants,
1911 supported_parameters=parameters,
1912 api_versions=api_versions)
1916 def OSFromDisk(name, base_dir=None):
1917 """Create an OS instance from disk.
1919 This function will return an OS instance if the given name is a
1920 valid OS name. Otherwise, it will raise an appropriate
1921 L{RPCFail} exception, detailing why this is not a valid OS.
1923 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1924 an exception but returns true/false status data.
1926 @type base_dir: string
1927 @keyword base_dir: Base directory containing OS installations.
1928 Defaults to a search in all the OS_SEARCH_PATH dirs.
1929 @rtype: L{objects.OS}
1930 @return: the OS instance if we find a valid one
1931 @raise RPCFail: if we don't find a valid OS
1934 name_only = name.split("+", 1)[0]
1935 status, payload = _TryOSFromDisk(name_only, base_dir)
1943 def OSCoreEnv(inst_os, os_params, debug=0):
1944 """Calculate the basic environment for an os script.
1946 @type inst_os: L{objects.OS}
1947 @param inst_os: operating system for which the environment is being built
1948 @type os_params: dict
1949 @param os_params: the OS parameters
1950 @type debug: integer
1951 @param debug: debug level (0 or 1, for OS Api 10)
1953 @return: dict of environment variables
1954 @raise errors.BlockDeviceError: if the block device
1960 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1961 result['OS_API_VERSION'] = '%d' % api_version
1962 result['OS_NAME'] = inst_os.name
1963 result['DEBUG_LEVEL'] = '%d' % debug
1966 if api_version >= constants.OS_API_V15:
1968 variant = inst_os.name.split('+', 1)[1]
1970 variant = inst_os.supported_variants[0]
1971 result['OS_VARIANT'] = variant
1974 for pname, pvalue in os_params.items():
1975 result['OSP_%s' % pname.upper()] = pvalue
1980 def OSEnvironment(instance, inst_os, debug=0):
1981 """Calculate the environment for an os script.
1983 @type instance: L{objects.Instance}
1984 @param instance: target instance for the os script run
1985 @type inst_os: L{objects.OS}
1986 @param inst_os: operating system for which the environment is being built
1987 @type debug: integer
1988 @param debug: debug level (0 or 1, for OS Api 10)
1990 @return: dict of environment variables
1991 @raise errors.BlockDeviceError: if the block device
1995 result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
1997 result['INSTANCE_NAME'] = instance.name
1998 result['INSTANCE_OS'] = instance.os
1999 result['HYPERVISOR'] = instance.hypervisor
2000 result['DISK_COUNT'] = '%d' % len(instance.disks)
2001 result['NIC_COUNT'] = '%d' % len(instance.nics)
2004 for idx, disk in enumerate(instance.disks):
2005 real_disk = _OpenRealBD(disk)
2006 result['DISK_%d_PATH' % idx] = real_disk.dev_path
2007 result['DISK_%d_ACCESS' % idx] = disk.mode
2008 if constants.HV_DISK_TYPE in instance.hvparams:
2009 result['DISK_%d_FRONTEND_TYPE' % idx] = \
2010 instance.hvparams[constants.HV_DISK_TYPE]
2011 if disk.dev_type in constants.LDS_BLOCK:
2012 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2013 elif disk.dev_type == constants.LD_FILE:
2014 result['DISK_%d_BACKEND_TYPE' % idx] = \
2015 'file:%s' % disk.physical_id[0]
2018 for idx, nic in enumerate(instance.nics):
2019 result['NIC_%d_MAC' % idx] = nic.mac
2021 result['NIC_%d_IP' % idx] = nic.ip
2022 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2023 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2024 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2025 if nic.nicparams[constants.NIC_LINK]:
2026 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2027 if constants.HV_NIC_TYPE in instance.hvparams:
2028 result['NIC_%d_FRONTEND_TYPE' % idx] = \
2029 instance.hvparams[constants.HV_NIC_TYPE]
2032 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2033 for key, value in source.items():
2034 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2039 def BlockdevGrow(disk, amount):
2040 """Grow a stack of block devices.
2042 This function is called recursively, with the childrens being the
2043 first ones to resize.
2045 @type disk: L{objects.Disk}
2046 @param disk: the disk to be grown
2047 @rtype: (status, result)
2048 @return: a tuple with the status of the operation
2049 (True/False), and the errors message if status
2053 r_dev = _RecursiveFindBD(disk)
2055 _Fail("Cannot find block device %s", disk)
2059 except errors.BlockDeviceError, err:
2060 _Fail("Failed to grow block device: %s", err, exc=True)
2063 def BlockdevSnapshot(disk):
2064 """Create a snapshot copy of a block device.
2066 This function is called recursively, and the snapshot is actually created
2067 just for the leaf lvm backend device.
2069 @type disk: L{objects.Disk}
2070 @param disk: the disk to be snapshotted
2072 @return: snapshot disk path
2075 if disk.dev_type == constants.LD_DRBD8:
2076 if not disk.children:
2077 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2079 return BlockdevSnapshot(disk.children[0])
2080 elif disk.dev_type == constants.LD_LV:
2081 r_dev = _RecursiveFindBD(disk)
2082 if r_dev is not None:
2083 # FIXME: choose a saner value for the snapshot size
2084 # let's stay on the safe side and ask for the full size, for now
2085 return r_dev.Snapshot(disk.size)
2087 _Fail("Cannot find block device %s", disk)
2089 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2090 disk.unique_id, disk.dev_type)
2093 def FinalizeExport(instance, snap_disks):
2094 """Write out the export configuration information.
2096 @type instance: L{objects.Instance}
2097 @param instance: the instance which we export, used for
2098 saving configuration
2099 @type snap_disks: list of L{objects.Disk}
2100 @param snap_disks: list of snapshot block devices, which
2101 will be used to get the actual name of the dump file
2106 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2107 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2109 config = objects.SerializableConfigParser()
2111 config.add_section(constants.INISECT_EXP)
2112 config.set(constants.INISECT_EXP, 'version', '0')
2113 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2114 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2115 config.set(constants.INISECT_EXP, 'os', instance.os)
2116 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2118 config.add_section(constants.INISECT_INS)
2119 config.set(constants.INISECT_INS, 'name', instance.name)
2120 config.set(constants.INISECT_INS, 'memory', '%d' %
2121 instance.beparams[constants.BE_MEMORY])
2122 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2123 instance.beparams[constants.BE_VCPUS])
2124 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2125 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2128 for nic_count, nic in enumerate(instance.nics):
2130 config.set(constants.INISECT_INS, 'nic%d_mac' %
2131 nic_count, '%s' % nic.mac)
2132 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2133 for param in constants.NICS_PARAMETER_TYPES:
2134 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2135 '%s' % nic.nicparams.get(param, None))
2136 # TODO: redundant: on load can read nics until it doesn't exist
2137 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2140 for disk_count, disk in enumerate(snap_disks):
2143 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2144 ('%s' % disk.iv_name))
2145 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2146 ('%s' % disk.physical_id[1]))
2147 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2150 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2152 # New-style hypervisor/backend parameters
2154 config.add_section(constants.INISECT_HYP)
2155 for name, value in instance.hvparams.items():
2156 if name not in constants.HVC_GLOBALS:
2157 config.set(constants.INISECT_HYP, name, str(value))
2159 config.add_section(constants.INISECT_BEP)
2160 for name, value in instance.beparams.items():
2161 config.set(constants.INISECT_BEP, name, str(value))
2163 config.add_section(constants.INISECT_OSP)
2164 for name, value in instance.osparams.items():
2165 config.set(constants.INISECT_OSP, name, str(value))
2167 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2168 data=config.Dumps())
2169 shutil.rmtree(finaldestdir, ignore_errors=True)
2170 shutil.move(destdir, finaldestdir)
2173 def ExportInfo(dest):
2174 """Get export configuration information.
2177 @param dest: directory containing the export
2179 @rtype: L{objects.SerializableConfigParser}
2180 @return: a serializable config file containing the
2184 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2186 config = objects.SerializableConfigParser()
2189 if (not config.has_section(constants.INISECT_EXP) or
2190 not config.has_section(constants.INISECT_INS)):
2191 _Fail("Export info file doesn't have the required fields")
2193 return config.Dumps()
2197 """Return a list of exports currently available on this machine.
2200 @return: list of the exports
2203 if os.path.isdir(constants.EXPORT_DIR):
2204 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2206 _Fail("No exports directory")
2209 def RemoveExport(export):
2210 """Remove an existing export from the node.
2213 @param export: the name of the export to remove
2217 target = utils.PathJoin(constants.EXPORT_DIR, export)
2220 shutil.rmtree(target)
2221 except EnvironmentError, err:
2222 _Fail("Error while removing the export: %s", err, exc=True)
2225 def BlockdevRename(devlist):
2226 """Rename a list of block devices.
2228 @type devlist: list of tuples
2229 @param devlist: list of tuples of the form (disk,
2230 new_logical_id, new_physical_id); disk is an
2231 L{objects.Disk} object describing the current disk,
2232 and new logical_id/physical_id is the name we
2235 @return: True if all renames succeeded, False otherwise
2240 for disk, unique_id in devlist:
2241 dev = _RecursiveFindBD(disk)
2243 msgs.append("Can't find device %s in rename" % str(disk))
2247 old_rpath = dev.dev_path
2248 dev.Rename(unique_id)
2249 new_rpath = dev.dev_path
2250 if old_rpath != new_rpath:
2251 DevCacheManager.RemoveCache(old_rpath)
2252 # FIXME: we should add the new cache information here, like:
2253 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2254 # but we don't have the owner here - maybe parse from existing
2255 # cache? for now, we only lose lvm data when we rename, which
2256 # is less critical than DRBD or MD
2257 except errors.BlockDeviceError, err:
2258 msgs.append("Can't rename device '%s' to '%s': %s" %
2259 (dev, unique_id, err))
2260 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2263 _Fail("; ".join(msgs))
2266 def _TransformFileStorageDir(file_storage_dir):
2267 """Checks whether given file_storage_dir is valid.
2269 Checks wheter the given file_storage_dir is within the cluster-wide
2270 default file_storage_dir stored in SimpleStore. Only paths under that
2271 directory are allowed.
2273 @type file_storage_dir: str
2274 @param file_storage_dir: the path to check
2276 @return: the normalized path if valid, None otherwise
2279 if not constants.ENABLE_FILE_STORAGE:
2280 _Fail("File storage disabled at configure time")
2282 file_storage_dir = os.path.normpath(file_storage_dir)
2283 base_file_storage_dir = cfg.GetFileStorageDir()
2284 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2285 base_file_storage_dir):
2286 _Fail("File storage directory '%s' is not under base file"
2287 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2288 return file_storage_dir
2291 def CreateFileStorageDir(file_storage_dir):
2292 """Create file storage directory.
2294 @type file_storage_dir: str
2295 @param file_storage_dir: directory to create
2298 @return: tuple with first element a boolean indicating wheter dir
2299 creation was successful or not
2302 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2303 if os.path.exists(file_storage_dir):
2304 if not os.path.isdir(file_storage_dir):
2305 _Fail("Specified storage dir '%s' is not a directory",
2309 os.makedirs(file_storage_dir, 0750)
2310 except OSError, err:
2311 _Fail("Cannot create file storage directory '%s': %s",
2312 file_storage_dir, err, exc=True)
2315 def RemoveFileStorageDir(file_storage_dir):
2316 """Remove file storage directory.
2318 Remove it only if it's empty. If not log an error and return.
2320 @type file_storage_dir: str
2321 @param file_storage_dir: the directory we should cleanup
2322 @rtype: tuple (success,)
2323 @return: tuple of one element, C{success}, denoting
2324 whether the operation was successful
2327 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2328 if os.path.exists(file_storage_dir):
2329 if not os.path.isdir(file_storage_dir):
2330 _Fail("Specified Storage directory '%s' is not a directory",
2332 # deletes dir only if empty, otherwise we want to fail the rpc call
2334 os.rmdir(file_storage_dir)
2335 except OSError, err:
2336 _Fail("Cannot remove file storage directory '%s': %s",
2337 file_storage_dir, err)
2340 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2341 """Rename the file storage directory.
2343 @type old_file_storage_dir: str
2344 @param old_file_storage_dir: the current path
2345 @type new_file_storage_dir: str
2346 @param new_file_storage_dir: the name we should rename to
2347 @rtype: tuple (success,)
2348 @return: tuple of one element, C{success}, denoting
2349 whether the operation was successful
2352 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2353 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2354 if not os.path.exists(new_file_storage_dir):
2355 if os.path.isdir(old_file_storage_dir):
2357 os.rename(old_file_storage_dir, new_file_storage_dir)
2358 except OSError, err:
2359 _Fail("Cannot rename '%s' to '%s': %s",
2360 old_file_storage_dir, new_file_storage_dir, err)
2362 _Fail("Specified storage dir '%s' is not a directory",
2363 old_file_storage_dir)
2365 if os.path.exists(old_file_storage_dir):
2366 _Fail("Cannot rename '%s' to '%s': both locations exist",
2367 old_file_storage_dir, new_file_storage_dir)
2370 def _EnsureJobQueueFile(file_name):
2371 """Checks whether the given filename is in the queue directory.
2373 @type file_name: str
2374 @param file_name: the file name we should check
2376 @raises RPCFail: if the file is not valid
2379 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2380 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2383 _Fail("Passed job queue file '%s' does not belong to"
2384 " the queue directory '%s'", file_name, queue_dir)
2387 def JobQueueUpdate(file_name, content):
2388 """Updates a file in the queue directory.
2390 This is just a wrapper over L{utils.WriteFile}, with proper
2393 @type file_name: str
2394 @param file_name: the job file name
2396 @param content: the new job contents
2398 @return: the success of the operation
2401 _EnsureJobQueueFile(file_name)
2403 # Write and replace the file atomically
2404 utils.WriteFile(file_name, data=_Decompress(content))
2407 def JobQueueRename(old, new):
2408 """Renames a job queue file.
2410 This is just a wrapper over os.rename with proper checking.
2413 @param old: the old (actual) file name
2415 @param new: the desired file name
2417 @return: the success of the operation and payload
2420 _EnsureJobQueueFile(old)
2421 _EnsureJobQueueFile(new)
2423 utils.RenameFile(old, new, mkdir=True)
2426 def BlockdevClose(instance_name, disks):
2427 """Closes the given block devices.
2429 This means they will be switched to secondary mode (in case of
2432 @param instance_name: if the argument is not empty, the symlinks
2433 of this instance will be removed
2434 @type disks: list of L{objects.Disk}
2435 @param disks: the list of disks to be closed
2436 @rtype: tuple (success, message)
2437 @return: a tuple of success and message, where success
2438 indicates the succes of the operation, and message
2439 which will contain the error details in case we
2445 rd = _RecursiveFindBD(cf)
2447 _Fail("Can't find device %s", cf)
2454 except errors.BlockDeviceError, err:
2455 msg.append(str(err))
2457 _Fail("Can't make devices secondary: %s", ",".join(msg))
2460 _RemoveBlockDevLinks(instance_name, disks)
2463 def ValidateHVParams(hvname, hvparams):
2464 """Validates the given hypervisor parameters.
2466 @type hvname: string
2467 @param hvname: the hypervisor name
2468 @type hvparams: dict
2469 @param hvparams: the hypervisor parameters to be validated
2474 hv_type = hypervisor.GetHypervisor(hvname)
2475 hv_type.ValidateParameters(hvparams)
2476 except errors.HypervisorError, err:
2477 _Fail(str(err), log=False)
2480 def _CheckOSPList(os_obj, parameters):
2481 """Check whether a list of parameters is supported by the OS.
2483 @type os_obj: L{objects.OS}
2484 @param os_obj: OS object to check
2485 @type parameters: list
2486 @param parameters: the list of parameters to check
2489 supported = [v[0] for v in os_obj.supported_parameters]
2490 delta = frozenset(parameters).difference(supported)
2492 _Fail("The following parameters are not supported"
2493 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2496 def ValidateOS(required, osname, checks, osparams):
2497 """Validate the given OS' parameters.
2499 @type required: boolean
2500 @param required: whether absence of the OS should translate into
2502 @type osname: string
2503 @param osname: the OS to be validated
2505 @param checks: list of the checks to run (currently only 'parameters')
2506 @type osparams: dict
2507 @param osparams: dictionary with OS parameters
2509 @return: True if the validation passed, or False if the OS was not
2510 found and L{required} was false
2513 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2514 _Fail("Unknown checks required for OS %s: %s", osname,
2515 set(checks).difference(constants.OS_VALIDATE_CALLS))
2517 name_only = osname.split("+", 1)[0]
2518 status, tbv = _TryOSFromDisk(name_only, None)
2526 if max(tbv.api_versions) < constants.OS_API_V20:
2529 if constants.OS_VALIDATE_PARAMETERS in checks:
2530 _CheckOSPList(tbv, osparams.keys())
2532 validate_env = OSCoreEnv(tbv, osparams)
2533 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2536 logging.error("os validate command '%s' returned error: %s output: %s",
2537 result.cmd, result.fail_reason, result.output)
2538 _Fail("OS validation script failed (%s), output: %s",
2539 result.fail_reason, result.output, log=False)
2545 """Demotes the current node from master candidate role.
2548 # try to ensure we're not the master by mistake
2549 master, myself = ssconf.GetMasterAndMyself()
2550 if master == myself:
2551 _Fail("ssconf status shows I'm the master node, will not demote")
2553 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2554 if not result.failed:
2555 _Fail("The master daemon is running, will not demote")
2558 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2559 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2560 except EnvironmentError, err:
2561 if err.errno != errno.ENOENT:
2562 _Fail("Error while backing up cluster file: %s", err, exc=True)
2564 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2567 def _GetX509Filenames(cryptodir, name):
2568 """Returns the full paths for the private key and certificate.
2571 return (utils.PathJoin(cryptodir, name),
2572 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2573 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2576 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2577 """Creates a new X509 certificate for SSL/TLS.
2580 @param validity: Validity in seconds
2581 @rtype: tuple; (string, string)
2582 @return: Certificate name and public part
2585 (key_pem, cert_pem) = \
2586 utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2587 min(validity, _MAX_SSL_CERT_VALIDITY))
2589 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2590 prefix="x509-%s-" % utils.TimestampForFilename())
2592 name = os.path.basename(cert_dir)
2593 assert len(name) > 5
2595 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2597 utils.WriteFile(key_file, mode=0400, data=key_pem)
2598 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2600 # Never return private key as it shouldn't leave the node
2601 return (name, cert_pem)
2603 shutil.rmtree(cert_dir, ignore_errors=True)
2607 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2608 """Removes a X509 certificate.
2611 @param name: Certificate name
2614 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2616 utils.RemoveFile(key_file)
2617 utils.RemoveFile(cert_file)
2621 except EnvironmentError, err:
2622 _Fail("Cannot remove certificate directory '%s': %s",
2626 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2627 """Returns the command for the requested input/output.
2629 @type instance: L{objects.Instance}
2630 @param instance: The instance object
2631 @param mode: Import/export mode
2632 @param ieio: Input/output type
2633 @param ieargs: Input/output arguments
2636 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2643 if ieio == constants.IEIO_FILE:
2644 (filename, ) = ieargs
2646 if not utils.IsNormAbsPath(filename):
2647 _Fail("Path '%s' is not normalized or absolute", filename)
2649 directory = os.path.normpath(os.path.dirname(filename))
2651 if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2652 constants.EXPORT_DIR):
2653 _Fail("File '%s' is not under exports directory '%s'",
2654 filename, constants.EXPORT_DIR)
2657 utils.Makedirs(directory, mode=0750)
2659 quoted_filename = utils.ShellQuote(filename)
2661 if mode == constants.IEM_IMPORT:
2662 suffix = "> %s" % quoted_filename
2663 elif mode == constants.IEM_EXPORT:
2664 suffix = "< %s" % quoted_filename
2666 # Retrieve file size
2668 st = os.stat(filename)
2669 except EnvironmentError, err:
2670 logging.error("Can't stat(2) %s: %s", filename, err)
2672 exp_size = utils.BytesToMebibyte(st.st_size)
2674 elif ieio == constants.IEIO_RAW_DISK:
2677 real_disk = _OpenRealBD(disk)
2679 if mode == constants.IEM_IMPORT:
2680 # we set here a smaller block size as, due to transport buffering, more
2681 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2682 # is not already there or we pass a wrong path; we use notrunc to no
2683 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2684 # much memory; this means that at best, we flush every 64k, which will
2686 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2687 " bs=%s oflag=dsync"),
2691 elif mode == constants.IEM_EXPORT:
2692 # the block size on the read dd is 1MiB to match our units
2693 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2695 str(1024 * 1024), # 1 MB
2697 exp_size = disk.size
2699 elif ieio == constants.IEIO_SCRIPT:
2700 (disk, disk_index, ) = ieargs
2702 assert isinstance(disk_index, (int, long))
2704 real_disk = _OpenRealBD(disk)
2706 inst_os = OSFromDisk(instance.os)
2707 env = OSEnvironment(instance, inst_os)
2709 if mode == constants.IEM_IMPORT:
2710 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2711 env["IMPORT_INDEX"] = str(disk_index)
2712 script = inst_os.import_script
2714 elif mode == constants.IEM_EXPORT:
2715 env["EXPORT_DEVICE"] = real_disk.dev_path
2716 env["EXPORT_INDEX"] = str(disk_index)
2717 script = inst_os.export_script
2719 # TODO: Pass special environment only to script
2720 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2722 if mode == constants.IEM_IMPORT:
2723 suffix = "| %s" % script_cmd
2725 elif mode == constants.IEM_EXPORT:
2726 prefix = "%s |" % script_cmd
2728 # Let script predict size
2729 exp_size = constants.IE_CUSTOM_SIZE
2732 _Fail("Invalid %s I/O mode %r", mode, ieio)
2734 return (env, prefix, suffix, exp_size)
2737 def _CreateImportExportStatusDir(prefix):
2738 """Creates status directory for import/export.
2741 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2743 (prefix, utils.TimestampForFilename())))
2746 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2747 """Starts an import or export daemon.
2749 @param mode: Import/output mode
2750 @type opts: L{objects.ImportExportOptions}
2751 @param opts: Daemon options
2753 @param host: Remote host for export (None for import)
2755 @param port: Remote port for export (None for import)
2756 @type instance: L{objects.Instance}
2757 @param instance: Instance object
2758 @param ieio: Input/output type
2759 @param ieioargs: Input/output arguments
2762 if mode == constants.IEM_IMPORT:
2765 if not (host is None and port is None):
2766 _Fail("Can not specify host or port on import")
2768 elif mode == constants.IEM_EXPORT:
2771 if host is None or port is None:
2772 _Fail("Host and port must be specified for an export")
2775 _Fail("Invalid mode %r", mode)
2777 if (opts.key_name is None) ^ (opts.ca_pem is None):
2778 _Fail("Cluster certificate can only be used for both key and CA")
2780 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2781 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2783 if opts.key_name is None:
2785 key_path = constants.NODED_CERT_FILE
2786 cert_path = constants.NODED_CERT_FILE
2787 assert opts.ca_pem is None
2789 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2791 assert opts.ca_pem is not None
2793 for i in [key_path, cert_path]:
2794 if not os.path.exists(i):
2795 _Fail("File '%s' does not exist" % i)
2797 status_dir = _CreateImportExportStatusDir(prefix)
2799 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2800 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2801 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2803 if opts.ca_pem is None:
2805 ca = utils.ReadFile(constants.NODED_CERT_FILE)
2810 utils.WriteFile(ca_file, data=ca, mode=0400)
2813 constants.IMPORT_EXPORT_DAEMON,
2815 "--key=%s" % key_path,
2816 "--cert=%s" % cert_path,
2817 "--ca=%s" % ca_file,
2821 cmd.append("--host=%s" % host)
2824 cmd.append("--port=%s" % port)
2827 cmd.append("--compress=%s" % opts.compress)
2830 cmd.append("--magic=%s" % opts.magic)
2832 if exp_size is not None:
2833 cmd.append("--expected-size=%s" % exp_size)
2836 cmd.append("--cmd-prefix=%s" % cmd_prefix)
2839 cmd.append("--cmd-suffix=%s" % cmd_suffix)
2841 logfile = _InstanceLogName(prefix, instance.os, instance.name)
2843 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2844 # support for receiving a file descriptor for output
2845 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2848 # The import/export name is simply the status directory name
2849 return os.path.basename(status_dir)
2852 shutil.rmtree(status_dir, ignore_errors=True)
2856 def GetImportExportStatus(names):
2857 """Returns import/export daemon status.
2859 @type names: sequence
2860 @param names: List of names
2861 @rtype: List of dicts
2862 @return: Returns a list of the state of each named import/export or None if a
2863 status couldn't be read
2869 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2873 data = utils.ReadFile(status_file)
2874 except EnvironmentError, err:
2875 if err.errno != errno.ENOENT:
2883 result.append(serializer.LoadJson(data))
2888 def AbortImportExport(name):
2889 """Sends SIGTERM to a running import/export daemon.
2892 logging.info("Abort import/export %s", name)
2894 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2895 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2898 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2900 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2903 def CleanupImportExport(name):
2904 """Cleanup after an import or export.
2906 If the import/export daemon is still running it's killed. Afterwards the
2907 whole status directory is removed.
2910 logging.info("Finalizing import/export %s", name)
2912 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2914 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2917 logging.info("Import/export %s is still running with PID %s",
2919 utils.KillProcess(pid, waitpid=False)
2921 shutil.rmtree(status_dir, ignore_errors=True)
2924 def _FindDisks(nodes_ip, disks):
2925 """Sets the physical ID on disks and returns the block devices.
2928 # set the correct physical ID
2929 my_name = netutils.Hostname.GetSysName()
2931 cf.SetPhysicalID(my_name, nodes_ip)
2936 rd = _RecursiveFindBD(cf)
2938 _Fail("Can't find device %s", cf)
2943 def DrbdDisconnectNet(nodes_ip, disks):
2944 """Disconnects the network on a list of drbd devices.
2947 bdevs = _FindDisks(nodes_ip, disks)
2953 except errors.BlockDeviceError, err:
2954 _Fail("Can't change network configuration to standalone mode: %s",
2958 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2959 """Attaches the network on a list of drbd devices.
2962 bdevs = _FindDisks(nodes_ip, disks)
2965 for idx, rd in enumerate(bdevs):
2967 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2968 except EnvironmentError, err:
2969 _Fail("Can't create symlink: %s", err)
2970 # reconnect disks, switch to new master configuration and if
2971 # needed primary mode
2974 rd.AttachNet(multimaster)
2975 except errors.BlockDeviceError, err:
2976 _Fail("Can't change network configuration: %s", err)
2978 # wait until the disks are connected; we need to retry the re-attach
2979 # if the device becomes standalone, as this might happen if the one
2980 # node disconnects and reconnects in a different mode before the
2981 # other node reconnects; in this case, one or both of the nodes will
2982 # decide it has wrong configuration and switch to standalone
2985 all_connected = True
2988 stats = rd.GetProcStatus()
2990 all_connected = (all_connected and
2991 (stats.is_connected or stats.is_in_resync))
2993 if stats.is_standalone:
2994 # peer had different config info and this node became
2995 # standalone, even though this should not happen with the
2996 # new staged way of changing disk configs
2998 rd.AttachNet(multimaster)
2999 except errors.BlockDeviceError, err:
3000 _Fail("Can't change network configuration: %s", err)
3002 if not all_connected:
3003 raise utils.RetryAgain()
3006 # Start with a delay of 100 miliseconds and go up to 5 seconds
3007 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3008 except utils.RetryTimeout:
3009 _Fail("Timeout in disk reconnecting")
3012 # change to primary mode
3016 except errors.BlockDeviceError, err:
3017 _Fail("Can't change to primary mode: %s", err)
3020 def DrbdWaitSync(nodes_ip, disks):
3021 """Wait until DRBDs have synchronized.
3025 stats = rd.GetProcStatus()
3026 if not (stats.is_connected or stats.is_in_resync):
3027 raise utils.RetryAgain()
3030 bdevs = _FindDisks(nodes_ip, disks)
3036 # poll each second for 15 seconds
3037 stats = utils.Retry(_helper, 1, 15, args=[rd])
3038 except utils.RetryTimeout:
3039 stats = rd.GetProcStatus()
3041 if not (stats.is_connected or stats.is_in_resync):
3042 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3043 alldone = alldone and (not stats.is_in_resync)
3044 if stats.sync_percent is not None:
3045 min_resync = min(min_resync, stats.sync_percent)
3047 return (alldone, min_resync)
3050 def GetDrbdUsermodeHelper():
3051 """Returns DRBD usermode helper currently configured.
3055 return bdev.BaseDRBD.GetUsermodeHelper()
3056 except errors.BlockDeviceError, err:
3060 def PowercycleNode(hypervisor_type):
3061 """Hard-powercycle the node.
3063 Because we need to return first, and schedule the powercycle in the
3064 background, we won't be able to report failures nicely.
3067 hyper = hypervisor.GetHypervisor(hypervisor_type)
3071 # if we can't fork, we'll pretend that we're in the child process
3074 return "Reboot scheduled in 5 seconds"
3075 # ensure the child is running on ram
3078 except Exception: # pylint: disable-msg=W0703
3081 hyper.PowercycleNode()
3084 class HooksRunner(object):
3087 This class is instantiated on the node side (ganeti-noded) and not
3091 def __init__(self, hooks_base_dir=None):
3092 """Constructor for hooks runner.
3094 @type hooks_base_dir: str or None
3095 @param hooks_base_dir: if not None, this overrides the
3096 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3099 if hooks_base_dir is None:
3100 hooks_base_dir = constants.HOOKS_BASE_DIR
3101 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3103 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3105 def RunHooks(self, hpath, phase, env):
3106 """Run the scripts in the hooks directory.
3109 @param hpath: the path to the hooks directory which
3112 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3113 L{constants.HOOKS_PHASE_POST}
3115 @param env: dictionary with the environment for the hook
3117 @return: list of 3-element tuples:
3119 - script result, either L{constants.HKR_SUCCESS} or
3120 L{constants.HKR_FAIL}
3121 - output of the script
3123 @raise errors.ProgrammerError: for invalid input
3127 if phase == constants.HOOKS_PHASE_PRE:
3129 elif phase == constants.HOOKS_PHASE_POST:
3132 _Fail("Unknown hooks phase '%s'", phase)
3135 subdir = "%s-%s.d" % (hpath, suffix)
3136 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3140 if not os.path.isdir(dir_name):
3141 # for non-existing/non-dirs, we simply exit instead of logging a
3142 # warning at every operation
3145 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3147 for (relname, relstatus, runresult) in runparts_results:
3148 if relstatus == constants.RUNPARTS_SKIP:
3149 rrval = constants.HKR_SKIP
3151 elif relstatus == constants.RUNPARTS_ERR:
3152 rrval = constants.HKR_FAIL
3153 output = "Hook script execution error: %s" % runresult
3154 elif relstatus == constants.RUNPARTS_RUN:
3155 if runresult.failed:
3156 rrval = constants.HKR_FAIL
3158 rrval = constants.HKR_SUCCESS
3159 output = utils.SafeEncode(runresult.output.strip())
3160 results.append(("%s/%s" % (subdir, relname), rrval, output))
3165 class IAllocatorRunner(object):
3166 """IAllocator runner.
3168 This class is instantiated on the node side (ganeti-noded) and not on
3173 def Run(name, idata):
3174 """Run an iallocator script.
3177 @param name: the iallocator script name
3179 @param idata: the allocator input data
3182 @return: two element tuple of:
3184 - either error message or stdout of allocator (for success)
3187 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3189 if alloc_script is None:
3190 _Fail("iallocator module '%s' not found in the search path", name)
3192 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3196 result = utils.RunCmd([alloc_script, fin_name])
3198 _Fail("iallocator module '%s' failed: %s, output '%s'",
3199 name, result.fail_reason, result.output)
3203 return result.stdout
3206 class DevCacheManager(object):
3207 """Simple class for managing a cache of block device information.
3210 _DEV_PREFIX = "/dev/"
3211 _ROOT_DIR = constants.BDEV_CACHE_DIR
3214 def _ConvertPath(cls, dev_path):
3215 """Converts a /dev/name path to the cache file name.
3217 This replaces slashes with underscores and strips the /dev
3218 prefix. It then returns the full path to the cache file.
3221 @param dev_path: the C{/dev/} path name
3223 @return: the converted path name
3226 if dev_path.startswith(cls._DEV_PREFIX):
3227 dev_path = dev_path[len(cls._DEV_PREFIX):]
3228 dev_path = dev_path.replace("/", "_")
3229 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3233 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3234 """Updates the cache information for a given device.
3237 @param dev_path: the pathname of the device
3239 @param owner: the owner (instance name) of the device
3240 @type on_primary: bool
3241 @param on_primary: whether this is the primary
3244 @param iv_name: the instance-visible name of the
3245 device, as in objects.Disk.iv_name
3250 if dev_path is None:
3251 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3253 fpath = cls._ConvertPath(dev_path)
3259 iv_name = "not_visible"
3260 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3262 utils.WriteFile(fpath, data=fdata)
3263 except EnvironmentError, err:
3264 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3267 def RemoveCache(cls, dev_path):
3268 """Remove data for a dev_path.
3270 This is just a wrapper over L{utils.RemoveFile} with a converted
3271 path name and logging.
3274 @param dev_path: the pathname of the device
3279 if dev_path is None:
3280 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3282 fpath = cls._ConvertPath(dev_path)
3284 utils.RemoveFile(fpath)
3285 except EnvironmentError, err:
3286 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)