4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
63 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64 _ALLOWED_CLEAN_DIRS = frozenset([
66 constants.JOB_QUEUE_ARCHIVE_DIR,
68 constants.CRYPTO_KEYS_DIR,
70 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71 _X509_KEY_FILE = "key"
72 _X509_CERT_FILE = "cert"
73 _IES_STATUS_FILE = "status"
78 class RPCFail(Exception):
79 """Class denoting RPC failure.
81 Its argument is the error message.
86 def _Fail(msg, *args, **kwargs):
87 """Log an error and the raise an RPCFail exception.
89 This exception is then handled specially in the ganeti daemon and
90 turned into a 'failed' return type. As such, this function is a
91 useful shortcut for logging the error and returning it to the master
95 @param msg: the text of the exception
101 if "log" not in kwargs or kwargs["log"]: # if we should log this error
102 if "exc" in kwargs and kwargs["exc"]:
103 logging.exception(msg)
110 """Simple wrapper to return a SimpleStore.
112 @rtype: L{ssconf.SimpleStore}
113 @return: a SimpleStore instance
116 return ssconf.SimpleStore()
119 def _GetSshRunner(cluster_name):
120 """Simple wrapper to return an SshRunner.
122 @type cluster_name: str
123 @param cluster_name: the cluster name, which is needed
124 by the SshRunner constructor
125 @rtype: L{ssh.SshRunner}
126 @return: an SshRunner instance
129 return ssh.SshRunner(cluster_name)
132 def _Decompress(data):
133 """Unpacks data compressed by the RPC client.
135 @type data: list or tuple
136 @param data: Data sent by RPC client
138 @return: Decompressed data
141 assert isinstance(data, (list, tuple))
142 assert len(data) == 2
143 (encoding, content) = data
144 if encoding == constants.RPC_ENCODING_NONE:
146 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147 return zlib.decompress(base64.b64decode(content))
149 raise AssertionError("Unknown data encoding")
152 def _CleanDirectory(path, exclude=None):
153 """Removes all regular files in a directory.
156 @param path: the directory to clean
158 @param exclude: list of files to be excluded, defaults
162 if path not in _ALLOWED_CLEAN_DIRS:
163 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166 if not os.path.isdir(path):
171 # Normalize excluded paths
172 exclude = [os.path.normpath(i) for i in exclude]
174 for rel_name in utils.ListVisibleFiles(path):
175 full_name = utils.PathJoin(path, rel_name)
176 if full_name in exclude:
178 if os.path.isfile(full_name) and not os.path.islink(full_name):
179 utils.RemoveFile(full_name)
182 def _BuildUploadFileList():
183 """Build the list of allowed upload files.
185 This is abstracted so that it's built only once at module import time.
188 allowed_files = set([
189 constants.CLUSTER_CONF_FILE,
191 constants.SSH_KNOWN_HOSTS_FILE,
192 constants.VNC_PASSWORD_FILE,
193 constants.RAPI_CERT_FILE,
194 constants.RAPI_USERS_FILE,
195 constants.CONFD_HMAC_KEY,
196 constants.CLUSTER_DOMAIN_SECRET_FILE,
199 for hv_name in constants.HYPER_TYPES:
200 hv_class = hypervisor.GetHypervisorClass(hv_name)
201 allowed_files.update(hv_class.GetAncillaryFiles())
203 return frozenset(allowed_files)
206 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
210 """Removes job queue files and archived jobs.
216 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
217 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
221 """Returns master information.
223 This is an utility function to compute master information, either
224 for consumption here or from the node daemon.
227 @return: master_netdev, master_ip, master_name
228 @raise RPCFail: in case of errors
233 master_netdev = cfg.GetMasterNetdev()
234 master_ip = cfg.GetMasterIP()
235 master_node = cfg.GetMasterNode()
236 except errors.ConfigurationError, err:
237 _Fail("Cluster configuration incomplete: %s", err, exc=True)
238 return (master_netdev, master_ip, master_node)
241 def StartMaster(start_daemons, no_voting):
242 """Activate local node as master node.
244 The function will always try activate the IP address of the master
245 (unless someone else has it). It will also start the master daemons,
246 based on the start_daemons parameter.
248 @type start_daemons: boolean
249 @param start_daemons: whether to also start the master
250 daemons (ganeti-masterd and ganeti-rapi)
251 @type no_voting: boolean
252 @param no_voting: whether to start ganeti-masterd without a node vote
253 (if start_daemons is True), but still non-interactively
257 # GetMasterInfo will raise an exception if not able to return data
258 master_netdev, master_ip, _ = GetMasterInfo()
261 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
262 if utils.OwnIpAddress(master_ip):
263 # we already have the ip:
264 logging.debug("Master IP already configured, doing nothing")
266 msg = "Someone else has the master ip, not activating"
270 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
271 "dev", master_netdev, "label",
272 "%s:0" % master_netdev])
274 msg = "Can't activate master IP: %s" % result.output
278 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
279 "-s", master_ip, master_ip])
280 # we'll ignore the exit code of arping
282 # and now start the master and rapi daemons
285 masterd_args = "--no-voting --yes-do-it"
290 "EXTRA_MASTERD_ARGS": masterd_args,
293 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
295 msg = "Can't start Ganeti master: %s" % result.output
300 _Fail("; ".join(err_msgs))
303 def StopMaster(stop_daemons):
304 """Deactivate this node as master.
306 The function will always try to deactivate the IP address of the
307 master. It will also stop the master daemons depending on the
308 stop_daemons parameter.
310 @type stop_daemons: boolean
311 @param stop_daemons: whether to also stop the master daemons
312 (ganeti-masterd and ganeti-rapi)
316 # TODO: log and report back to the caller the error failures; we
317 # need to decide in which case we fail the RPC for this
319 # GetMasterInfo will raise an exception if not able to return data
320 master_netdev, master_ip, _ = GetMasterInfo()
322 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
323 "dev", master_netdev])
325 logging.error("Can't remove the master IP, error: %s", result.output)
326 # but otherwise ignore the failure
329 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
331 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
333 result.cmd, result.exit_code, result.output)
336 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
337 """Joins this node to the cluster.
339 This does the following:
340 - updates the hostkeys of the machine (rsa and dsa)
341 - adds the ssh private key to the user
342 - adds the ssh public key to the users' authorized_keys file
345 @param dsa: the DSA private key to write
347 @param dsapub: the DSA public key to write
349 @param rsa: the RSA private key to write
351 @param rsapub: the RSA public key to write
353 @param sshkey: the SSH private key to write
355 @param sshpub: the SSH public key to write
357 @return: the success of the operation
360 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
361 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
362 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
363 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
364 for name, content, mode in sshd_keys:
365 utils.WriteFile(name, data=content, mode=mode)
368 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
370 except errors.OpExecError, err:
371 _Fail("Error while processing user ssh files: %s", err, exc=True)
373 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
374 utils.WriteFile(name, data=content, mode=0600)
376 utils.AddAuthorizedKey(auth_keys, sshpub)
378 result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
380 _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
381 result.cmd, result.exit_code, result.output)
384 def LeaveCluster(modify_ssh_setup):
385 """Cleans up and remove the current node.
387 This function cleans up and prepares the current node to be removed
390 If processing is successful, then it raises an
391 L{errors.QuitGanetiException} which is used as a special case to
392 shutdown the node daemon.
394 @param modify_ssh_setup: boolean
397 _CleanDirectory(constants.DATA_DIR)
398 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
403 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
405 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
407 utils.RemoveFile(priv_key)
408 utils.RemoveFile(pub_key)
409 except errors.OpExecError:
410 logging.exception("Error while processing ssh files")
413 utils.RemoveFile(constants.CONFD_HMAC_KEY)
414 utils.RemoveFile(constants.RAPI_CERT_FILE)
415 utils.RemoveFile(constants.NODED_CERT_FILE)
416 except: # pylint: disable-msg=W0702
417 logging.exception("Error while removing cluster secrets")
419 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
421 logging.error("Command %s failed with exitcode %s and error %s",
422 result.cmd, result.exit_code, result.output)
424 # Raise a custom exception (handled in ganeti-noded)
425 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
428 def GetNodeInfo(vgname, hypervisor_type):
429 """Gives back a hash with different information about the node.
431 @type vgname: C{string}
432 @param vgname: the name of the volume group to ask for disk space information
433 @type hypervisor_type: C{str}
434 @param hypervisor_type: the name of the hypervisor to ask for
437 @return: dictionary with the following keys:
438 - vg_size is the size of the configured volume group in MiB
439 - vg_free is the free size of the volume group in MiB
440 - memory_dom0 is the memory allocated for domain0 in MiB
441 - memory_free is the currently available (free) ram in MiB
442 - memory_total is the total number of ram in MiB
446 vginfo = _GetVGInfo(vgname)
447 outputarray['vg_size'] = vginfo['vg_size']
448 outputarray['vg_free'] = vginfo['vg_free']
450 hyper = hypervisor.GetHypervisor(hypervisor_type)
451 hyp_info = hyper.GetNodeInfo()
452 if hyp_info is not None:
453 outputarray.update(hyp_info)
455 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
460 def VerifyNode(what, cluster_name):
461 """Verify the status of the local node.
463 Based on the input L{what} parameter, various checks are done on the
466 If the I{filelist} key is present, this list of
467 files is checksummed and the file/checksum pairs are returned.
469 If the I{nodelist} key is present, we check that we have
470 connectivity via ssh with the target nodes (and check the hostname
473 If the I{node-net-test} key is present, we check that we have
474 connectivity to the given nodes via both primary IP and, if
475 applicable, secondary IPs.
478 @param what: a dictionary of things to check:
479 - filelist: list of files for which to compute checksums
480 - nodelist: list of nodes we should check ssh communication with
481 - node-net-test: list of nodes we should check node daemon port
483 - hypervisor: list with hypervisors to run the verify for
485 @return: a dictionary with the same keys as the input dict, and
486 values representing the result of the checks
490 my_name = utils.HostInfo().name
491 port = utils.GetDaemonPort(constants.NODED)
493 if constants.NV_HYPERVISOR in what:
494 result[constants.NV_HYPERVISOR] = tmp = {}
495 for hv_name in what[constants.NV_HYPERVISOR]:
497 val = hypervisor.GetHypervisor(hv_name).Verify()
498 except errors.HypervisorError, err:
499 val = "Error while checking hypervisor: %s" % str(err)
502 if constants.NV_FILELIST in what:
503 result[constants.NV_FILELIST] = utils.FingerprintFiles(
504 what[constants.NV_FILELIST])
506 if constants.NV_NODELIST in what:
507 result[constants.NV_NODELIST] = tmp = {}
508 random.shuffle(what[constants.NV_NODELIST])
509 for node in what[constants.NV_NODELIST]:
510 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
514 if constants.NV_NODENETTEST in what:
515 result[constants.NV_NODENETTEST] = tmp = {}
516 my_pip = my_sip = None
517 for name, pip, sip in what[constants.NV_NODENETTEST]:
523 tmp[my_name] = ("Can't find my own primary/secondary IP"
526 for name, pip, sip in what[constants.NV_NODENETTEST]:
528 if not utils.TcpPing(pip, port, source=my_pip):
529 fail.append("primary")
531 if not utils.TcpPing(sip, port, source=my_sip):
532 fail.append("secondary")
534 tmp[name] = ("failure using the %s interface(s)" %
537 if constants.NV_MASTERIP in what:
538 # FIXME: add checks on incoming data structures (here and in the
539 # rest of the function)
540 master_name, master_ip = what[constants.NV_MASTERIP]
541 if master_name == my_name:
542 source = constants.IP4_ADDRESS_LOCALHOST
545 result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
548 if constants.NV_LVLIST in what:
550 val = GetVolumeList(what[constants.NV_LVLIST])
553 result[constants.NV_LVLIST] = val
555 if constants.NV_INSTANCELIST in what:
556 # GetInstanceList can fail
558 val = GetInstanceList(what[constants.NV_INSTANCELIST])
561 result[constants.NV_INSTANCELIST] = val
563 if constants.NV_VGLIST in what:
564 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
566 if constants.NV_PVLIST in what:
567 result[constants.NV_PVLIST] = \
568 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
569 filter_allocatable=False)
571 if constants.NV_VERSION in what:
572 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
573 constants.RELEASE_VERSION)
575 if constants.NV_HVINFO in what:
576 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
577 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
579 if constants.NV_DRBDLIST in what:
581 used_minors = bdev.DRBD8.GetUsedDevs().keys()
582 except errors.BlockDeviceError, err:
583 logging.warning("Can't get used minors list", exc_info=True)
584 used_minors = str(err)
585 result[constants.NV_DRBDLIST] = used_minors
587 if constants.NV_DRBDHELPER in what:
590 payload = bdev.BaseDRBD.GetUsermodeHelper()
591 except errors.BlockDeviceError, err:
592 logging.error("Can't get DRBD usermode helper: %s", str(err))
595 result[constants.NV_DRBDHELPER] = (status, payload)
597 if constants.NV_NODESETUP in what:
598 result[constants.NV_NODESETUP] = tmpr = []
599 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
600 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
601 " under /sys, missing required directories /sys/block"
602 " and /sys/class/net")
603 if (not os.path.isdir("/proc/sys") or
604 not os.path.isfile("/proc/sysrq-trigger")):
605 tmpr.append("The procfs filesystem doesn't seem to be mounted"
606 " under /proc, missing required directory /proc/sys and"
607 " the file /proc/sysrq-trigger")
609 if constants.NV_TIME in what:
610 result[constants.NV_TIME] = utils.SplitTime(time.time())
612 if constants.NV_OSLIST in what:
613 result[constants.NV_OSLIST] = DiagnoseOS()
618 def GetVolumeList(vg_name):
619 """Compute list of logical volumes and their size.
622 @param vg_name: the volume group whose LVs we should list
625 dictionary of all partions (key) with value being a tuple of
626 their size (in MiB), inactive and online status::
628 {'test1': ('20.06', True, True)}
630 in case of errors, a string is returned with the error
636 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
637 "--separator=%s" % sep,
638 "-olv_name,lv_size,lv_attr", vg_name])
640 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
642 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
643 for line in result.stdout.splitlines():
645 match = valid_line_re.match(line)
647 logging.error("Invalid line returned from lvs output: '%s'", line)
649 name, size, attr = match.groups()
650 inactive = attr[4] == '-'
651 online = attr[5] == 'o'
652 virtual = attr[0] == 'v'
654 # we don't want to report such volumes as existing, since they
655 # don't really hold data
657 lvs[name] = (size, inactive, online)
662 def ListVolumeGroups():
663 """List the volume groups and their size.
666 @return: dictionary with keys volume name and values the
670 return utils.ListVolumeGroups()
674 """List all volumes on this node.
678 A list of dictionaries, each having four keys:
679 - name: the logical volume name,
680 - size: the size of the logical volume
681 - dev: the physical device on which the LV lives
682 - vg: the volume group to which it belongs
684 In case of errors, we return an empty list and log the
687 Note that since a logical volume can live on multiple physical
688 volumes, the resulting list might include a logical volume
692 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
694 "--options=lv_name,lv_size,devices,vg_name"])
696 _Fail("Failed to list logical volumes, lvs output: %s",
700 return dev.split('(')[0]
703 return [parse_dev(x) for x in dev.split(",")]
706 line = [v.strip() for v in line]
707 return [{'name': line[0], 'size': line[1],
708 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
711 for line in result.stdout.splitlines():
712 if line.count('|') >= 3:
713 all_devs.extend(map_line(line.split('|')))
715 logging.warning("Strange line in the output from lvs: '%s'", line)
719 def BridgesExist(bridges_list):
720 """Check if a list of bridges exist on the current node.
723 @return: C{True} if all of them exist, C{False} otherwise
727 for bridge in bridges_list:
728 if not utils.BridgeExists(bridge):
729 missing.append(bridge)
732 _Fail("Missing bridges %s", utils.CommaJoin(missing))
735 def GetInstanceList(hypervisor_list):
736 """Provides a list of instances.
738 @type hypervisor_list: list
739 @param hypervisor_list: the list of hypervisors to query information
742 @return: a list of all running instances on the current node
743 - instance1.example.com
744 - instance2.example.com
748 for hname in hypervisor_list:
750 names = hypervisor.GetHypervisor(hname).ListInstances()
751 results.extend(names)
752 except errors.HypervisorError, err:
753 _Fail("Error enumerating instances (hypervisor %s): %s",
754 hname, err, exc=True)
759 def GetInstanceInfo(instance, hname):
760 """Gives back the information about an instance as a dictionary.
762 @type instance: string
763 @param instance: the instance name
765 @param hname: the hypervisor type of the instance
768 @return: dictionary with the following keys:
769 - memory: memory size of instance (int)
770 - state: xen state of instance (string)
771 - time: cpu time of instance (float)
776 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
777 if iinfo is not None:
778 output['memory'] = iinfo[2]
779 output['state'] = iinfo[4]
780 output['time'] = iinfo[5]
785 def GetInstanceMigratable(instance):
786 """Gives whether an instance can be migrated.
788 @type instance: L{objects.Instance}
789 @param instance: object representing the instance to be checked.
792 @return: tuple of (result, description) where:
793 - result: whether the instance can be migrated or not
794 - description: a description of the issue, if relevant
797 hyper = hypervisor.GetHypervisor(instance.hypervisor)
798 iname = instance.name
799 if iname not in hyper.ListInstances():
800 _Fail("Instance %s is not running", iname)
802 for idx in range(len(instance.disks)):
803 link_name = _GetBlockDevSymlinkPath(iname, idx)
804 if not os.path.islink(link_name):
805 _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
808 def GetAllInstancesInfo(hypervisor_list):
809 """Gather data about all instances.
811 This is the equivalent of L{GetInstanceInfo}, except that it
812 computes data for all instances at once, thus being faster if one
813 needs data about more than one instance.
815 @type hypervisor_list: list
816 @param hypervisor_list: list of hypervisors to query for instance data
819 @return: dictionary of instance: data, with data having the following keys:
820 - memory: memory size of instance (int)
821 - state: xen state of instance (string)
822 - time: cpu time of instance (float)
823 - vcpus: the number of vcpus
828 for hname in hypervisor_list:
829 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
831 for name, _, memory, vcpus, state, times in iinfo:
839 # we only check static parameters, like memory and vcpus,
840 # and not state and time which can change between the
841 # invocations of the different hypervisors
842 for key in 'memory', 'vcpus':
843 if value[key] != output[name][key]:
844 _Fail("Instance %s is running twice"
845 " with different parameters", name)
851 def _InstanceLogName(kind, os_name, instance):
852 """Compute the OS log filename for a given instance and operation.
854 The instance name and os name are passed in as strings since not all
855 operations have these as part of an instance object.
858 @param kind: the operation type (e.g. add, import, etc.)
859 @type os_name: string
860 @param os_name: the os name
861 @type instance: string
862 @param instance: the name of the instance being imported/added/etc.
865 # TODO: Use tempfile.mkstemp to create unique filename
866 base = ("%s-%s-%s-%s.log" %
867 (kind, os_name, instance, utils.TimestampForFilename()))
868 return utils.PathJoin(constants.LOG_OS_DIR, base)
871 def InstanceOsAdd(instance, reinstall, debug):
872 """Add an OS to an instance.
874 @type instance: L{objects.Instance}
875 @param instance: Instance whose OS is to be installed
876 @type reinstall: boolean
877 @param reinstall: whether this is an instance reinstall
879 @param debug: debug level, passed to the OS scripts
883 inst_os = OSFromDisk(instance.os)
885 create_env = OSEnvironment(instance, inst_os, debug)
887 create_env['INSTANCE_REINSTALL'] = "1"
889 logfile = _InstanceLogName("add", instance.os, instance.name)
891 result = utils.RunCmd([inst_os.create_script], env=create_env,
892 cwd=inst_os.path, output=logfile,)
894 logging.error("os create command '%s' returned error: %s, logfile: %s,"
895 " output: %s", result.cmd, result.fail_reason, logfile,
897 lines = [utils.SafeEncode(val)
898 for val in utils.TailFile(logfile, lines=20)]
899 _Fail("OS create script failed (%s), last lines in the"
900 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
903 def RunRenameInstance(instance, old_name, debug):
904 """Run the OS rename script for an instance.
906 @type instance: L{objects.Instance}
907 @param instance: Instance whose OS is to be installed
908 @type old_name: string
909 @param old_name: previous instance name
911 @param debug: debug level, passed to the OS scripts
913 @return: the success of the operation
916 inst_os = OSFromDisk(instance.os)
918 rename_env = OSEnvironment(instance, inst_os, debug)
919 rename_env['OLD_INSTANCE_NAME'] = old_name
921 logfile = _InstanceLogName("rename", instance.os,
922 "%s-%s" % (old_name, instance.name))
924 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
925 cwd=inst_os.path, output=logfile)
928 logging.error("os create command '%s' returned error: %s output: %s",
929 result.cmd, result.fail_reason, result.output)
930 lines = [utils.SafeEncode(val)
931 for val in utils.TailFile(logfile, lines=20)]
932 _Fail("OS rename script failed (%s), last lines in the"
933 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
936 def _GetVGInfo(vg_name):
937 """Get information about the volume group.
940 @param vg_name: the volume group which we query
943 A dictionary with the following keys:
944 - C{vg_size} is the total size of the volume group in MiB
945 - C{vg_free} is the free size of the volume group in MiB
946 - C{pv_count} are the number of physical disks in that VG
948 If an error occurs during gathering of data, we return the same dict
949 with keys all set to None.
952 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
954 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
955 "--nosuffix", "--units=m", "--separator=:", vg_name])
958 logging.error("volume group %s not present", vg_name)
960 valarr = retval.stdout.strip().rstrip(':').split(':')
964 "vg_size": int(round(float(valarr[0]), 0)),
965 "vg_free": int(round(float(valarr[1]), 0)),
966 "pv_count": int(valarr[2]),
968 except (TypeError, ValueError), err:
969 logging.exception("Fail to parse vgs output: %s", err)
971 logging.error("vgs output has the wrong number of fields (expected"
972 " three): %s", str(valarr))
976 def _GetBlockDevSymlinkPath(instance_name, idx):
977 return utils.PathJoin(constants.DISK_LINKS_DIR,
978 "%s:%d" % (instance_name, idx))
981 def _SymlinkBlockDev(instance_name, device_path, idx):
982 """Set up symlinks to a instance's block device.
984 This is an auxiliary function run when an instance is start (on the primary
985 node) or when an instance is migrated (on the target node).
988 @param instance_name: the name of the target instance
989 @param device_path: path of the physical block device, on the node
990 @param idx: the disk index
991 @return: absolute path to the disk's symlink
994 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
996 os.symlink(device_path, link_name)
998 if err.errno == errno.EEXIST:
999 if (not os.path.islink(link_name) or
1000 os.readlink(link_name) != device_path):
1001 os.remove(link_name)
1002 os.symlink(device_path, link_name)
1009 def _RemoveBlockDevLinks(instance_name, disks):
1010 """Remove the block device symlinks belonging to the given instance.
1013 for idx, _ in enumerate(disks):
1014 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1015 if os.path.islink(link_name):
1017 os.remove(link_name)
1019 logging.exception("Can't remove symlink '%s'", link_name)
1022 def _GatherAndLinkBlockDevs(instance):
1023 """Set up an instance's block device(s).
1025 This is run on the primary node at instance startup. The block
1026 devices must be already assembled.
1028 @type instance: L{objects.Instance}
1029 @param instance: the instance whose disks we shoul assemble
1031 @return: list of (disk_object, device_path)
1035 for idx, disk in enumerate(instance.disks):
1036 device = _RecursiveFindBD(disk)
1038 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1042 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1044 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1047 block_devices.append((disk, link_name))
1049 return block_devices
1052 def StartInstance(instance):
1053 """Start an instance.
1055 @type instance: L{objects.Instance}
1056 @param instance: the instance object
1060 running_instances = GetInstanceList([instance.hypervisor])
1062 if instance.name in running_instances:
1063 logging.info("Instance %s already running, not starting", instance.name)
1067 block_devices = _GatherAndLinkBlockDevs(instance)
1068 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1069 hyper.StartInstance(instance, block_devices)
1070 except errors.BlockDeviceError, err:
1071 _Fail("Block device error: %s", err, exc=True)
1072 except errors.HypervisorError, err:
1073 _RemoveBlockDevLinks(instance.name, instance.disks)
1074 _Fail("Hypervisor error: %s", err, exc=True)
1077 def InstanceShutdown(instance, timeout):
1078 """Shut an instance down.
1080 @note: this functions uses polling with a hardcoded timeout.
1082 @type instance: L{objects.Instance}
1083 @param instance: the instance object
1084 @type timeout: integer
1085 @param timeout: maximum timeout for soft shutdown
1089 hv_name = instance.hypervisor
1090 hyper = hypervisor.GetHypervisor(hv_name)
1091 iname = instance.name
1093 if instance.name not in hyper.ListInstances():
1094 logging.info("Instance %s not running, doing nothing", iname)
1099 self.tried_once = False
1102 if iname not in hyper.ListInstances():
1106 hyper.StopInstance(instance, retry=self.tried_once)
1107 except errors.HypervisorError, err:
1108 if iname not in hyper.ListInstances():
1109 # if the instance is no longer existing, consider this a
1110 # success and go to cleanup
1113 _Fail("Failed to stop instance %s: %s", iname, err)
1115 self.tried_once = True
1117 raise utils.RetryAgain()
1120 utils.Retry(_TryShutdown(), 5, timeout)
1121 except utils.RetryTimeout:
1122 # the shutdown did not succeed
1123 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1126 hyper.StopInstance(instance, force=True)
1127 except errors.HypervisorError, err:
1128 if iname in hyper.ListInstances():
1129 # only raise an error if the instance still exists, otherwise
1130 # the error could simply be "instance ... unknown"!
1131 _Fail("Failed to force stop instance %s: %s", iname, err)
1135 if iname in hyper.ListInstances():
1136 _Fail("Could not shutdown instance %s even by destroy", iname)
1139 hyper.CleanupInstance(instance.name)
1140 except errors.HypervisorError, err:
1141 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1143 _RemoveBlockDevLinks(iname, instance.disks)
1146 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1147 """Reboot an instance.
1149 @type instance: L{objects.Instance}
1150 @param instance: the instance object to reboot
1151 @type reboot_type: str
1152 @param reboot_type: the type of reboot, one the following
1154 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1155 instance OS, do not recreate the VM
1156 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1157 restart the VM (at the hypervisor level)
1158 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1159 not accepted here, since that mode is handled differently, in
1160 cmdlib, and translates into full stop and start of the
1161 instance (instead of a call_instance_reboot RPC)
1162 @type shutdown_timeout: integer
1163 @param shutdown_timeout: maximum timeout for soft shutdown
1167 running_instances = GetInstanceList([instance.hypervisor])
1169 if instance.name not in running_instances:
1170 _Fail("Cannot reboot instance %s that is not running", instance.name)
1172 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1173 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1175 hyper.RebootInstance(instance)
1176 except errors.HypervisorError, err:
1177 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1178 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1180 InstanceShutdown(instance, shutdown_timeout)
1181 return StartInstance(instance)
1182 except errors.HypervisorError, err:
1183 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1185 _Fail("Invalid reboot_type received: %s", reboot_type)
1188 def MigrationInfo(instance):
1189 """Gather information about an instance to be migrated.
1191 @type instance: L{objects.Instance}
1192 @param instance: the instance definition
1195 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197 info = hyper.MigrationInfo(instance)
1198 except errors.HypervisorError, err:
1199 _Fail("Failed to fetch migration information: %s", err, exc=True)
1203 def AcceptInstance(instance, info, target):
1204 """Prepare the node to accept an instance.
1206 @type instance: L{objects.Instance}
1207 @param instance: the instance definition
1208 @type info: string/data (opaque)
1209 @param info: migration information, from the source node
1210 @type target: string
1211 @param target: target host (usually ip), on this node
1214 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1216 hyper.AcceptInstance(instance, info, target)
1217 except errors.HypervisorError, err:
1218 _Fail("Failed to accept instance: %s", err, exc=True)
1221 def FinalizeMigration(instance, info, success):
1222 """Finalize any preparation to accept an instance.
1224 @type instance: L{objects.Instance}
1225 @param instance: the instance definition
1226 @type info: string/data (opaque)
1227 @param info: migration information, from the source node
1228 @type success: boolean
1229 @param success: whether the migration was a success or a failure
1232 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1234 hyper.FinalizeMigration(instance, info, success)
1235 except errors.HypervisorError, err:
1236 _Fail("Failed to finalize migration: %s", err, exc=True)
1239 def MigrateInstance(instance, target, live):
1240 """Migrates an instance to another node.
1242 @type instance: L{objects.Instance}
1243 @param instance: the instance definition
1244 @type target: string
1245 @param target: the target node name
1247 @param live: whether the migration should be done live or not (the
1248 interpretation of this parameter is left to the hypervisor)
1250 @return: a tuple of (success, msg) where:
1251 - succes is a boolean denoting the success/failure of the operation
1252 - msg is a string with details in case of failure
1255 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1258 hyper.MigrateInstance(instance, target, live)
1259 except errors.HypervisorError, err:
1260 _Fail("Failed to migrate instance: %s", err, exc=True)
1263 def BlockdevCreate(disk, size, owner, on_primary, info):
1264 """Creates a block device for an instance.
1266 @type disk: L{objects.Disk}
1267 @param disk: the object describing the disk we should create
1269 @param size: the size of the physical underlying device, in MiB
1271 @param owner: the name of the instance for which disk is created,
1272 used for device cache data
1273 @type on_primary: boolean
1274 @param on_primary: indicates if it is the primary node or not
1276 @param info: string that will be sent to the physical device
1277 creation, used for example to set (LVM) tags on LVs
1279 @return: the new unique_id of the device (this can sometime be
1280 computed only after creation), or None. On secondary nodes,
1281 it's not required to return anything.
1284 # TODO: remove the obsolete 'size' argument
1285 # pylint: disable-msg=W0613
1288 for child in disk.children:
1290 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1291 except errors.BlockDeviceError, err:
1292 _Fail("Can't assemble device %s: %s", child, err)
1293 if on_primary or disk.AssembleOnSecondary():
1294 # we need the children open in case the device itself has to
1297 # pylint: disable-msg=E1103
1299 except errors.BlockDeviceError, err:
1300 _Fail("Can't make child '%s' read-write: %s", child, err)
1304 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1305 except errors.BlockDeviceError, err:
1306 _Fail("Can't create block device: %s", err)
1308 if on_primary or disk.AssembleOnSecondary():
1311 except errors.BlockDeviceError, err:
1312 _Fail("Can't assemble device after creation, unusual event: %s", err)
1313 device.SetSyncSpeed(constants.SYNC_SPEED)
1314 if on_primary or disk.OpenOnSecondary():
1316 device.Open(force=True)
1317 except errors.BlockDeviceError, err:
1318 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1319 DevCacheManager.UpdateCache(device.dev_path, owner,
1320 on_primary, disk.iv_name)
1322 device.SetInfo(info)
1324 return device.unique_id
1327 def BlockdevRemove(disk):
1328 """Remove a block device.
1330 @note: This is intended to be called recursively.
1332 @type disk: L{objects.Disk}
1333 @param disk: the disk object we should remove
1335 @return: the success of the operation
1340 rdev = _RecursiveFindBD(disk)
1341 except errors.BlockDeviceError, err:
1342 # probably can't attach
1343 logging.info("Can't attach to device %s in remove", disk)
1345 if rdev is not None:
1346 r_path = rdev.dev_path
1349 except errors.BlockDeviceError, err:
1350 msgs.append(str(err))
1352 DevCacheManager.RemoveCache(r_path)
1355 for child in disk.children:
1357 BlockdevRemove(child)
1358 except RPCFail, err:
1359 msgs.append(str(err))
1362 _Fail("; ".join(msgs))
1365 def _RecursiveAssembleBD(disk, owner, as_primary):
1366 """Activate a block device for an instance.
1368 This is run on the primary and secondary nodes for an instance.
1370 @note: this function is called recursively.
1372 @type disk: L{objects.Disk}
1373 @param disk: the disk we try to assemble
1375 @param owner: the name of the instance which owns the disk
1376 @type as_primary: boolean
1377 @param as_primary: if we should make the block device
1380 @return: the assembled device or None (in case no device
1382 @raise errors.BlockDeviceError: in case there is an error
1383 during the activation of the children or the device
1389 mcn = disk.ChildrenNeeded()
1391 mcn = 0 # max number of Nones allowed
1393 mcn = len(disk.children) - mcn # max number of Nones
1394 for chld_disk in disk.children:
1396 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1397 except errors.BlockDeviceError, err:
1398 if children.count(None) >= mcn:
1401 logging.error("Error in child activation (but continuing): %s",
1403 children.append(cdev)
1405 if as_primary or disk.AssembleOnSecondary():
1406 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1407 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1409 if as_primary or disk.OpenOnSecondary():
1411 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1412 as_primary, disk.iv_name)
1419 def BlockdevAssemble(disk, owner, as_primary):
1420 """Activate a block device for an instance.
1422 This is a wrapper over _RecursiveAssembleBD.
1424 @rtype: str or boolean
1425 @return: a C{/dev/...} path for primary nodes, and
1426 C{True} for secondary nodes
1430 result = _RecursiveAssembleBD(disk, owner, as_primary)
1431 if isinstance(result, bdev.BlockDev):
1432 # pylint: disable-msg=E1103
1433 result = result.dev_path
1434 except errors.BlockDeviceError, err:
1435 _Fail("Error while assembling disk: %s", err, exc=True)
1440 def BlockdevShutdown(disk):
1441 """Shut down a block device.
1443 First, if the device is assembled (Attach() is successful), then
1444 the device is shutdown. Then the children of the device are
1447 This function is called recursively. Note that we don't cache the
1448 children or such, as oppossed to assemble, shutdown of different
1449 devices doesn't require that the upper device was active.
1451 @type disk: L{objects.Disk}
1452 @param disk: the description of the disk we should
1458 r_dev = _RecursiveFindBD(disk)
1459 if r_dev is not None:
1460 r_path = r_dev.dev_path
1463 DevCacheManager.RemoveCache(r_path)
1464 except errors.BlockDeviceError, err:
1465 msgs.append(str(err))
1468 for child in disk.children:
1470 BlockdevShutdown(child)
1471 except RPCFail, err:
1472 msgs.append(str(err))
1475 _Fail("; ".join(msgs))
1478 def BlockdevAddchildren(parent_cdev, new_cdevs):
1479 """Extend a mirrored block device.
1481 @type parent_cdev: L{objects.Disk}
1482 @param parent_cdev: the disk to which we should add children
1483 @type new_cdevs: list of L{objects.Disk}
1484 @param new_cdevs: the list of children which we should add
1488 parent_bdev = _RecursiveFindBD(parent_cdev)
1489 if parent_bdev is None:
1490 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1491 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1492 if new_bdevs.count(None) > 0:
1493 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1494 parent_bdev.AddChildren(new_bdevs)
1497 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1498 """Shrink a mirrored block device.
1500 @type parent_cdev: L{objects.Disk}
1501 @param parent_cdev: the disk from which we should remove children
1502 @type new_cdevs: list of L{objects.Disk}
1503 @param new_cdevs: the list of children which we should remove
1507 parent_bdev = _RecursiveFindBD(parent_cdev)
1508 if parent_bdev is None:
1509 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1511 for disk in new_cdevs:
1512 rpath = disk.StaticDevPath()
1514 bd = _RecursiveFindBD(disk)
1516 _Fail("Can't find device %s while removing children", disk)
1518 devs.append(bd.dev_path)
1520 if not utils.IsNormAbsPath(rpath):
1521 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1523 parent_bdev.RemoveChildren(devs)
1526 def BlockdevGetmirrorstatus(disks):
1527 """Get the mirroring status of a list of devices.
1529 @type disks: list of L{objects.Disk}
1530 @param disks: the list of disks which we should query
1533 a list of (mirror_done, estimated_time) tuples, which
1534 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1535 @raise errors.BlockDeviceError: if any of the disks cannot be
1541 rbd = _RecursiveFindBD(dsk)
1543 _Fail("Can't find device %s", dsk)
1545 stats.append(rbd.CombinedSyncStatus())
1550 def _RecursiveFindBD(disk):
1551 """Check if a device is activated.
1553 If so, return information about the real device.
1555 @type disk: L{objects.Disk}
1556 @param disk: the disk object we need to find
1558 @return: None if the device can't be found,
1559 otherwise the device instance
1564 for chdisk in disk.children:
1565 children.append(_RecursiveFindBD(chdisk))
1567 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1570 def _OpenRealBD(disk):
1571 """Opens the underlying block device of a disk.
1573 @type disk: L{objects.Disk}
1574 @param disk: the disk object we want to open
1577 real_disk = _RecursiveFindBD(disk)
1578 if real_disk is None:
1579 _Fail("Block device '%s' is not set up", disk)
1586 def BlockdevFind(disk):
1587 """Check if a device is activated.
1589 If it is, return information about the real device.
1591 @type disk: L{objects.Disk}
1592 @param disk: the disk to find
1593 @rtype: None or objects.BlockDevStatus
1594 @return: None if the disk cannot be found, otherwise a the current
1599 rbd = _RecursiveFindBD(disk)
1600 except errors.BlockDeviceError, err:
1601 _Fail("Failed to find device: %s", err, exc=True)
1606 return rbd.GetSyncStatus()
1609 def BlockdevGetsize(disks):
1610 """Computes the size of the given disks.
1612 If a disk is not found, returns None instead.
1614 @type disks: list of L{objects.Disk}
1615 @param disks: the list of disk to compute the size for
1617 @return: list with elements None if the disk cannot be found,
1624 rbd = _RecursiveFindBD(cf)
1625 except errors.BlockDeviceError:
1631 result.append(rbd.GetActualSize())
1635 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1636 """Export a block device to a remote node.
1638 @type disk: L{objects.Disk}
1639 @param disk: the description of the disk to export
1640 @type dest_node: str
1641 @param dest_node: the destination node to export to
1642 @type dest_path: str
1643 @param dest_path: the destination path on the target node
1644 @type cluster_name: str
1645 @param cluster_name: the cluster name, needed for SSH hostalias
1649 real_disk = _OpenRealBD(disk)
1651 # the block size on the read dd is 1MiB to match our units
1652 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1653 "dd if=%s bs=1048576 count=%s",
1654 real_disk.dev_path, str(disk.size))
1656 # we set here a smaller block size as, due to ssh buffering, more
1657 # than 64-128k will mostly ignored; we use nocreat to fail if the
1658 # device is not already there or we pass a wrong path; we use
1659 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1660 # to not buffer too much memory; this means that at best, we flush
1661 # every 64k, which will not be very fast
1662 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1663 " oflag=dsync", dest_path)
1665 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1666 constants.GANETI_RUNAS,
1669 # all commands have been checked, so we're safe to combine them
1670 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1672 result = utils.RunCmd(["bash", "-c", command])
1675 _Fail("Disk copy command '%s' returned error: %s"
1676 " output: %s", command, result.fail_reason, result.output)
1679 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1680 """Write a file to the filesystem.
1682 This allows the master to overwrite(!) a file. It will only perform
1683 the operation if the file belongs to a list of configuration files.
1685 @type file_name: str
1686 @param file_name: the target file name
1688 @param data: the new contents of the file
1690 @param mode: the mode to give the file (can be None)
1692 @param uid: the owner of the file (can be -1 for default)
1694 @param gid: the group of the file (can be -1 for default)
1696 @param atime: the atime to set on the file (can be None)
1698 @param mtime: the mtime to set on the file (can be None)
1702 if not os.path.isabs(file_name):
1703 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1705 if file_name not in _ALLOWED_UPLOAD_FILES:
1706 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1709 raw_data = _Decompress(data)
1711 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1712 atime=atime, mtime=mtime)
1715 def WriteSsconfFiles(values):
1716 """Update all ssconf files.
1718 Wrapper around the SimpleStore.WriteFiles.
1721 ssconf.SimpleStore().WriteFiles(values)
1724 def _ErrnoOrStr(err):
1725 """Format an EnvironmentError exception.
1727 If the L{err} argument has an errno attribute, it will be looked up
1728 and converted into a textual C{E...} description. Otherwise the
1729 string representation of the error will be returned.
1731 @type err: L{EnvironmentError}
1732 @param err: the exception to format
1735 if hasattr(err, 'errno'):
1736 detail = errno.errorcode[err.errno]
1742 def _OSOndiskAPIVersion(os_dir):
1743 """Compute and return the API version of a given OS.
1745 This function will try to read the API version of the OS residing in
1746 the 'os_dir' directory.
1749 @param os_dir: the directory in which we should look for the OS
1751 @return: tuple (status, data) with status denoting the validity and
1752 data holding either the vaid versions or an error message
1755 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1758 st = os.stat(api_file)
1759 except EnvironmentError, err:
1760 return False, ("Required file '%s' not found under path %s: %s" %
1761 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1763 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1764 return False, ("File '%s' in %s is not a regular file" %
1765 (constants.OS_API_FILE, os_dir))
1768 api_versions = utils.ReadFile(api_file).splitlines()
1769 except EnvironmentError, err:
1770 return False, ("Error while reading the API version file at %s: %s" %
1771 (api_file, _ErrnoOrStr(err)))
1774 api_versions = [int(version.strip()) for version in api_versions]
1775 except (TypeError, ValueError), err:
1776 return False, ("API version(s) can't be converted to integer: %s" %
1779 return True, api_versions
1782 def DiagnoseOS(top_dirs=None):
1783 """Compute the validity for all OSes.
1785 @type top_dirs: list
1786 @param top_dirs: the list of directories in which to
1787 search (if not given defaults to
1788 L{constants.OS_SEARCH_PATH})
1789 @rtype: list of L{objects.OS}
1790 @return: a list of tuples (name, path, status, diagnose, variants,
1791 parameters, api_version) for all (potential) OSes under all
1792 search paths, where:
1793 - name is the (potential) OS name
1794 - path is the full path to the OS
1795 - status True/False is the validity of the OS
1796 - diagnose is the error message for an invalid OS, otherwise empty
1797 - variants is a list of supported OS variants, if any
1798 - parameters is a list of (name, help) parameters, if any
1799 - api_version is a list of support OS API versions
1802 if top_dirs is None:
1803 top_dirs = constants.OS_SEARCH_PATH
1806 for dir_name in top_dirs:
1807 if os.path.isdir(dir_name):
1809 f_names = utils.ListVisibleFiles(dir_name)
1810 except EnvironmentError, err:
1811 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1813 for name in f_names:
1814 os_path = utils.PathJoin(dir_name, name)
1815 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1818 variants = os_inst.supported_variants
1819 parameters = os_inst.supported_parameters
1820 api_versions = os_inst.api_versions
1823 variants = parameters = api_versions = []
1824 result.append((name, os_path, status, diagnose, variants,
1825 parameters, api_versions))
1830 def _TryOSFromDisk(name, base_dir=None):
1831 """Create an OS instance from disk.
1833 This function will return an OS instance if the given name is a
1836 @type base_dir: string
1837 @keyword base_dir: Base directory containing OS installations.
1838 Defaults to a search in all the OS_SEARCH_PATH dirs.
1840 @return: success and either the OS instance if we find a valid one,
1844 if base_dir is None:
1845 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1847 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1850 return False, "Directory for OS %s not found in search path" % name
1852 status, api_versions = _OSOndiskAPIVersion(os_dir)
1855 return status, api_versions
1857 if not constants.OS_API_VERSIONS.intersection(api_versions):
1858 return False, ("API version mismatch for path '%s': found %s, want %s." %
1859 (os_dir, api_versions, constants.OS_API_VERSIONS))
1861 # OS Files dictionary, we will populate it with the absolute path names
1862 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1864 if max(api_versions) >= constants.OS_API_V15:
1865 os_files[constants.OS_VARIANTS_FILE] = ''
1867 if max(api_versions) >= constants.OS_API_V20:
1868 os_files[constants.OS_PARAMETERS_FILE] = ''
1870 del os_files[constants.OS_SCRIPT_VERIFY]
1872 for filename in os_files:
1873 os_files[filename] = utils.PathJoin(os_dir, filename)
1876 st = os.stat(os_files[filename])
1877 except EnvironmentError, err:
1878 return False, ("File '%s' under path '%s' is missing (%s)" %
1879 (filename, os_dir, _ErrnoOrStr(err)))
1881 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1882 return False, ("File '%s' under path '%s' is not a regular file" %
1885 if filename in constants.OS_SCRIPTS:
1886 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1887 return False, ("File '%s' under path '%s' is not executable" %
1891 if constants.OS_VARIANTS_FILE in os_files:
1892 variants_file = os_files[constants.OS_VARIANTS_FILE]
1894 variants = utils.ReadFile(variants_file).splitlines()
1895 except EnvironmentError, err:
1896 return False, ("Error while reading the OS variants file at %s: %s" %
1897 (variants_file, _ErrnoOrStr(err)))
1899 return False, ("No supported os variant found")
1902 if constants.OS_PARAMETERS_FILE in os_files:
1903 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1905 parameters = utils.ReadFile(parameters_file).splitlines()
1906 except EnvironmentError, err:
1907 return False, ("Error while reading the OS parameters file at %s: %s" %
1908 (parameters_file, _ErrnoOrStr(err)))
1909 parameters = [v.split(None, 1) for v in parameters]
1911 os_obj = objects.OS(name=name, path=os_dir,
1912 create_script=os_files[constants.OS_SCRIPT_CREATE],
1913 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1914 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1915 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1916 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1918 supported_variants=variants,
1919 supported_parameters=parameters,
1920 api_versions=api_versions)
1924 def OSFromDisk(name, base_dir=None):
1925 """Create an OS instance from disk.
1927 This function will return an OS instance if the given name is a
1928 valid OS name. Otherwise, it will raise an appropriate
1929 L{RPCFail} exception, detailing why this is not a valid OS.
1931 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1932 an exception but returns true/false status data.
1934 @type base_dir: string
1935 @keyword base_dir: Base directory containing OS installations.
1936 Defaults to a search in all the OS_SEARCH_PATH dirs.
1937 @rtype: L{objects.OS}
1938 @return: the OS instance if we find a valid one
1939 @raise RPCFail: if we don't find a valid OS
1942 name_only = name.split("+", 1)[0]
1943 status, payload = _TryOSFromDisk(name_only, base_dir)
1951 def OSCoreEnv(inst_os, os_params, debug=0):
1952 """Calculate the basic environment for an os script.
1954 @type inst_os: L{objects.OS}
1955 @param inst_os: operating system for which the environment is being built
1956 @type os_params: dict
1957 @param os_params: the OS parameters
1958 @type debug: integer
1959 @param debug: debug level (0 or 1, for OS Api 10)
1961 @return: dict of environment variables
1962 @raise errors.BlockDeviceError: if the block device
1968 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1969 result['OS_API_VERSION'] = '%d' % api_version
1970 result['OS_NAME'] = inst_os.name
1971 result['DEBUG_LEVEL'] = '%d' % debug
1974 if api_version >= constants.OS_API_V15:
1976 variant = inst_os.name.split('+', 1)[1]
1978 variant = inst_os.supported_variants[0]
1979 result['OS_VARIANT'] = variant
1982 for pname, pvalue in os_params.items():
1983 result['OSP_%s' % pname.upper()] = pvalue
1988 def OSEnvironment(instance, inst_os, debug=0):
1989 """Calculate the environment for an os script.
1991 @type instance: L{objects.Instance}
1992 @param instance: target instance for the os script run
1993 @type inst_os: L{objects.OS}
1994 @param inst_os: operating system for which the environment is being built
1995 @type debug: integer
1996 @param debug: debug level (0 or 1, for OS Api 10)
1998 @return: dict of environment variables
1999 @raise errors.BlockDeviceError: if the block device
2003 result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
2005 result['INSTANCE_NAME'] = instance.name
2006 result['INSTANCE_OS'] = instance.os
2007 result['HYPERVISOR'] = instance.hypervisor
2008 result['DISK_COUNT'] = '%d' % len(instance.disks)
2009 result['NIC_COUNT'] = '%d' % len(instance.nics)
2012 for idx, disk in enumerate(instance.disks):
2013 real_disk = _OpenRealBD(disk)
2014 result['DISK_%d_PATH' % idx] = real_disk.dev_path
2015 result['DISK_%d_ACCESS' % idx] = disk.mode
2016 if constants.HV_DISK_TYPE in instance.hvparams:
2017 result['DISK_%d_FRONTEND_TYPE' % idx] = \
2018 instance.hvparams[constants.HV_DISK_TYPE]
2019 if disk.dev_type in constants.LDS_BLOCK:
2020 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2021 elif disk.dev_type == constants.LD_FILE:
2022 result['DISK_%d_BACKEND_TYPE' % idx] = \
2023 'file:%s' % disk.physical_id[0]
2026 for idx, nic in enumerate(instance.nics):
2027 result['NIC_%d_MAC' % idx] = nic.mac
2029 result['NIC_%d_IP' % idx] = nic.ip
2030 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2031 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2032 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2033 if nic.nicparams[constants.NIC_LINK]:
2034 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2035 if constants.HV_NIC_TYPE in instance.hvparams:
2036 result['NIC_%d_FRONTEND_TYPE' % idx] = \
2037 instance.hvparams[constants.HV_NIC_TYPE]
2040 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2041 for key, value in source.items():
2042 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2047 def BlockdevGrow(disk, amount):
2048 """Grow a stack of block devices.
2050 This function is called recursively, with the childrens being the
2051 first ones to resize.
2053 @type disk: L{objects.Disk}
2054 @param disk: the disk to be grown
2055 @rtype: (status, result)
2056 @return: a tuple with the status of the operation
2057 (True/False), and the errors message if status
2061 r_dev = _RecursiveFindBD(disk)
2063 _Fail("Cannot find block device %s", disk)
2067 except errors.BlockDeviceError, err:
2068 _Fail("Failed to grow block device: %s", err, exc=True)
2071 def BlockdevSnapshot(disk):
2072 """Create a snapshot copy of a block device.
2074 This function is called recursively, and the snapshot is actually created
2075 just for the leaf lvm backend device.
2077 @type disk: L{objects.Disk}
2078 @param disk: the disk to be snapshotted
2080 @return: snapshot disk path
2083 if disk.dev_type == constants.LD_DRBD8:
2084 if not disk.children:
2085 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2087 return BlockdevSnapshot(disk.children[0])
2088 elif disk.dev_type == constants.LD_LV:
2089 r_dev = _RecursiveFindBD(disk)
2090 if r_dev is not None:
2091 # FIXME: choose a saner value for the snapshot size
2092 # let's stay on the safe side and ask for the full size, for now
2093 return r_dev.Snapshot(disk.size)
2095 _Fail("Cannot find block device %s", disk)
2097 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2098 disk.unique_id, disk.dev_type)
2101 def FinalizeExport(instance, snap_disks):
2102 """Write out the export configuration information.
2104 @type instance: L{objects.Instance}
2105 @param instance: the instance which we export, used for
2106 saving configuration
2107 @type snap_disks: list of L{objects.Disk}
2108 @param snap_disks: list of snapshot block devices, which
2109 will be used to get the actual name of the dump file
2114 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2115 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2117 config = objects.SerializableConfigParser()
2119 config.add_section(constants.INISECT_EXP)
2120 config.set(constants.INISECT_EXP, 'version', '0')
2121 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2122 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2123 config.set(constants.INISECT_EXP, 'os', instance.os)
2124 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2126 config.add_section(constants.INISECT_INS)
2127 config.set(constants.INISECT_INS, 'name', instance.name)
2128 config.set(constants.INISECT_INS, 'memory', '%d' %
2129 instance.beparams[constants.BE_MEMORY])
2130 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2131 instance.beparams[constants.BE_VCPUS])
2132 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2133 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2136 for nic_count, nic in enumerate(instance.nics):
2138 config.set(constants.INISECT_INS, 'nic%d_mac' %
2139 nic_count, '%s' % nic.mac)
2140 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2141 for param in constants.NICS_PARAMETER_TYPES:
2142 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2143 '%s' % nic.nicparams.get(param, None))
2144 # TODO: redundant: on load can read nics until it doesn't exist
2145 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2148 for disk_count, disk in enumerate(snap_disks):
2151 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2152 ('%s' % disk.iv_name))
2153 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2154 ('%s' % disk.physical_id[1]))
2155 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2158 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2160 # New-style hypervisor/backend parameters
2162 config.add_section(constants.INISECT_HYP)
2163 for name, value in instance.hvparams.items():
2164 if name not in constants.HVC_GLOBALS:
2165 config.set(constants.INISECT_HYP, name, str(value))
2167 config.add_section(constants.INISECT_BEP)
2168 for name, value in instance.beparams.items():
2169 config.set(constants.INISECT_BEP, name, str(value))
2171 config.add_section(constants.INISECT_OSP)
2172 for name, value in instance.osparams.items():
2173 config.set(constants.INISECT_OSP, name, str(value))
2175 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2176 data=config.Dumps())
2177 shutil.rmtree(finaldestdir, ignore_errors=True)
2178 shutil.move(destdir, finaldestdir)
2181 def ExportInfo(dest):
2182 """Get export configuration information.
2185 @param dest: directory containing the export
2187 @rtype: L{objects.SerializableConfigParser}
2188 @return: a serializable config file containing the
2192 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2194 config = objects.SerializableConfigParser()
2197 if (not config.has_section(constants.INISECT_EXP) or
2198 not config.has_section(constants.INISECT_INS)):
2199 _Fail("Export info file doesn't have the required fields")
2201 return config.Dumps()
2205 """Return a list of exports currently available on this machine.
2208 @return: list of the exports
2211 if os.path.isdir(constants.EXPORT_DIR):
2212 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2214 _Fail("No exports directory")
2217 def RemoveExport(export):
2218 """Remove an existing export from the node.
2221 @param export: the name of the export to remove
2225 target = utils.PathJoin(constants.EXPORT_DIR, export)
2228 shutil.rmtree(target)
2229 except EnvironmentError, err:
2230 _Fail("Error while removing the export: %s", err, exc=True)
2233 def BlockdevRename(devlist):
2234 """Rename a list of block devices.
2236 @type devlist: list of tuples
2237 @param devlist: list of tuples of the form (disk,
2238 new_logical_id, new_physical_id); disk is an
2239 L{objects.Disk} object describing the current disk,
2240 and new logical_id/physical_id is the name we
2243 @return: True if all renames succeeded, False otherwise
2248 for disk, unique_id in devlist:
2249 dev = _RecursiveFindBD(disk)
2251 msgs.append("Can't find device %s in rename" % str(disk))
2255 old_rpath = dev.dev_path
2256 dev.Rename(unique_id)
2257 new_rpath = dev.dev_path
2258 if old_rpath != new_rpath:
2259 DevCacheManager.RemoveCache(old_rpath)
2260 # FIXME: we should add the new cache information here, like:
2261 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2262 # but we don't have the owner here - maybe parse from existing
2263 # cache? for now, we only lose lvm data when we rename, which
2264 # is less critical than DRBD or MD
2265 except errors.BlockDeviceError, err:
2266 msgs.append("Can't rename device '%s' to '%s': %s" %
2267 (dev, unique_id, err))
2268 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2271 _Fail("; ".join(msgs))
2274 def _TransformFileStorageDir(file_storage_dir):
2275 """Checks whether given file_storage_dir is valid.
2277 Checks wheter the given file_storage_dir is within the cluster-wide
2278 default file_storage_dir stored in SimpleStore. Only paths under that
2279 directory are allowed.
2281 @type file_storage_dir: str
2282 @param file_storage_dir: the path to check
2284 @return: the normalized path if valid, None otherwise
2287 if not constants.ENABLE_FILE_STORAGE:
2288 _Fail("File storage disabled at configure time")
2290 file_storage_dir = os.path.normpath(file_storage_dir)
2291 base_file_storage_dir = cfg.GetFileStorageDir()
2292 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2293 base_file_storage_dir):
2294 _Fail("File storage directory '%s' is not under base file"
2295 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2296 return file_storage_dir
2299 def CreateFileStorageDir(file_storage_dir):
2300 """Create file storage directory.
2302 @type file_storage_dir: str
2303 @param file_storage_dir: directory to create
2306 @return: tuple with first element a boolean indicating wheter dir
2307 creation was successful or not
2310 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2311 if os.path.exists(file_storage_dir):
2312 if not os.path.isdir(file_storage_dir):
2313 _Fail("Specified storage dir '%s' is not a directory",
2317 os.makedirs(file_storage_dir, 0750)
2318 except OSError, err:
2319 _Fail("Cannot create file storage directory '%s': %s",
2320 file_storage_dir, err, exc=True)
2323 def RemoveFileStorageDir(file_storage_dir):
2324 """Remove file storage directory.
2326 Remove it only if it's empty. If not log an error and return.
2328 @type file_storage_dir: str
2329 @param file_storage_dir: the directory we should cleanup
2330 @rtype: tuple (success,)
2331 @return: tuple of one element, C{success}, denoting
2332 whether the operation was successful
2335 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2336 if os.path.exists(file_storage_dir):
2337 if not os.path.isdir(file_storage_dir):
2338 _Fail("Specified Storage directory '%s' is not a directory",
2340 # deletes dir only if empty, otherwise we want to fail the rpc call
2342 os.rmdir(file_storage_dir)
2343 except OSError, err:
2344 _Fail("Cannot remove file storage directory '%s': %s",
2345 file_storage_dir, err)
2348 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2349 """Rename the file storage directory.
2351 @type old_file_storage_dir: str
2352 @param old_file_storage_dir: the current path
2353 @type new_file_storage_dir: str
2354 @param new_file_storage_dir: the name we should rename to
2355 @rtype: tuple (success,)
2356 @return: tuple of one element, C{success}, denoting
2357 whether the operation was successful
2360 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2361 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2362 if not os.path.exists(new_file_storage_dir):
2363 if os.path.isdir(old_file_storage_dir):
2365 os.rename(old_file_storage_dir, new_file_storage_dir)
2366 except OSError, err:
2367 _Fail("Cannot rename '%s' to '%s': %s",
2368 old_file_storage_dir, new_file_storage_dir, err)
2370 _Fail("Specified storage dir '%s' is not a directory",
2371 old_file_storage_dir)
2373 if os.path.exists(old_file_storage_dir):
2374 _Fail("Cannot rename '%s' to '%s': both locations exist",
2375 old_file_storage_dir, new_file_storage_dir)
2378 def _EnsureJobQueueFile(file_name):
2379 """Checks whether the given filename is in the queue directory.
2381 @type file_name: str
2382 @param file_name: the file name we should check
2384 @raises RPCFail: if the file is not valid
2387 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2388 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2391 _Fail("Passed job queue file '%s' does not belong to"
2392 " the queue directory '%s'", file_name, queue_dir)
2395 def JobQueueUpdate(file_name, content):
2396 """Updates a file in the queue directory.
2398 This is just a wrapper over L{utils.WriteFile}, with proper
2401 @type file_name: str
2402 @param file_name: the job file name
2404 @param content: the new job contents
2406 @return: the success of the operation
2409 _EnsureJobQueueFile(file_name)
2411 # Write and replace the file atomically
2412 utils.WriteFile(file_name, data=_Decompress(content))
2415 def JobQueueRename(old, new):
2416 """Renames a job queue file.
2418 This is just a wrapper over os.rename with proper checking.
2421 @param old: the old (actual) file name
2423 @param new: the desired file name
2425 @return: the success of the operation and payload
2428 _EnsureJobQueueFile(old)
2429 _EnsureJobQueueFile(new)
2431 utils.RenameFile(old, new, mkdir=True)
2434 def BlockdevClose(instance_name, disks):
2435 """Closes the given block devices.
2437 This means they will be switched to secondary mode (in case of
2440 @param instance_name: if the argument is not empty, the symlinks
2441 of this instance will be removed
2442 @type disks: list of L{objects.Disk}
2443 @param disks: the list of disks to be closed
2444 @rtype: tuple (success, message)
2445 @return: a tuple of success and message, where success
2446 indicates the succes of the operation, and message
2447 which will contain the error details in case we
2453 rd = _RecursiveFindBD(cf)
2455 _Fail("Can't find device %s", cf)
2462 except errors.BlockDeviceError, err:
2463 msg.append(str(err))
2465 _Fail("Can't make devices secondary: %s", ",".join(msg))
2468 _RemoveBlockDevLinks(instance_name, disks)
2471 def ValidateHVParams(hvname, hvparams):
2472 """Validates the given hypervisor parameters.
2474 @type hvname: string
2475 @param hvname: the hypervisor name
2476 @type hvparams: dict
2477 @param hvparams: the hypervisor parameters to be validated
2482 hv_type = hypervisor.GetHypervisor(hvname)
2483 hv_type.ValidateParameters(hvparams)
2484 except errors.HypervisorError, err:
2485 _Fail(str(err), log=False)
2488 def _CheckOSPList(os_obj, parameters):
2489 """Check whether a list of parameters is supported by the OS.
2491 @type os_obj: L{objects.OS}
2492 @param os_obj: OS object to check
2493 @type parameters: list
2494 @param parameters: the list of parameters to check
2497 supported = [v[0] for v in os_obj.supported_parameters]
2498 delta = frozenset(parameters).difference(supported)
2500 _Fail("The following parameters are not supported"
2501 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2504 def ValidateOS(required, osname, checks, osparams):
2505 """Validate the given OS' parameters.
2507 @type required: boolean
2508 @param required: whether absence of the OS should translate into
2510 @type osname: string
2511 @param osname: the OS to be validated
2513 @param checks: list of the checks to run (currently only 'parameters')
2514 @type osparams: dict
2515 @param osparams: dictionary with OS parameters
2517 @return: True if the validation passed, or False if the OS was not
2518 found and L{required} was false
2521 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2522 _Fail("Unknown checks required for OS %s: %s", osname,
2523 set(checks).difference(constants.OS_VALIDATE_CALLS))
2525 name_only = osname.split("+", 1)[0]
2526 status, tbv = _TryOSFromDisk(name_only, None)
2534 if max(tbv.api_versions) < constants.OS_API_V20:
2537 if constants.OS_VALIDATE_PARAMETERS in checks:
2538 _CheckOSPList(tbv, osparams.keys())
2540 validate_env = OSCoreEnv(tbv, osparams)
2541 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2544 logging.error("os validate command '%s' returned error: %s output: %s",
2545 result.cmd, result.fail_reason, result.output)
2546 _Fail("OS validation script failed (%s), output: %s",
2547 result.fail_reason, result.output, log=False)
2553 """Demotes the current node from master candidate role.
2556 # try to ensure we're not the master by mistake
2557 master, myself = ssconf.GetMasterAndMyself()
2558 if master == myself:
2559 _Fail("ssconf status shows I'm the master node, will not demote")
2561 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2562 if not result.failed:
2563 _Fail("The master daemon is running, will not demote")
2566 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2567 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2568 except EnvironmentError, err:
2569 if err.errno != errno.ENOENT:
2570 _Fail("Error while backing up cluster file: %s", err, exc=True)
2572 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2575 def _GetX509Filenames(cryptodir, name):
2576 """Returns the full paths for the private key and certificate.
2579 return (utils.PathJoin(cryptodir, name),
2580 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2581 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2584 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2585 """Creates a new X509 certificate for SSL/TLS.
2588 @param validity: Validity in seconds
2589 @rtype: tuple; (string, string)
2590 @return: Certificate name and public part
2593 (key_pem, cert_pem) = \
2594 utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2595 min(validity, _MAX_SSL_CERT_VALIDITY))
2597 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2598 prefix="x509-%s-" % utils.TimestampForFilename())
2600 name = os.path.basename(cert_dir)
2601 assert len(name) > 5
2603 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2605 utils.WriteFile(key_file, mode=0400, data=key_pem)
2606 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2608 # Never return private key as it shouldn't leave the node
2609 return (name, cert_pem)
2611 shutil.rmtree(cert_dir, ignore_errors=True)
2615 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2616 """Removes a X509 certificate.
2619 @param name: Certificate name
2622 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2624 utils.RemoveFile(key_file)
2625 utils.RemoveFile(cert_file)
2629 except EnvironmentError, err:
2630 _Fail("Cannot remove certificate directory '%s': %s",
2634 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2635 """Returns the command for the requested input/output.
2637 @type instance: L{objects.Instance}
2638 @param instance: The instance object
2639 @param mode: Import/export mode
2640 @param ieio: Input/output type
2641 @param ieargs: Input/output arguments
2644 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2651 if ieio == constants.IEIO_FILE:
2652 (filename, ) = ieargs
2654 if not utils.IsNormAbsPath(filename):
2655 _Fail("Path '%s' is not normalized or absolute", filename)
2657 directory = os.path.normpath(os.path.dirname(filename))
2659 if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2660 constants.EXPORT_DIR):
2661 _Fail("File '%s' is not under exports directory '%s'",
2662 filename, constants.EXPORT_DIR)
2665 utils.Makedirs(directory, mode=0750)
2667 quoted_filename = utils.ShellQuote(filename)
2669 if mode == constants.IEM_IMPORT:
2670 suffix = "> %s" % quoted_filename
2671 elif mode == constants.IEM_EXPORT:
2672 suffix = "< %s" % quoted_filename
2674 # Retrieve file size
2676 st = os.stat(filename)
2677 except EnvironmentError, err:
2678 logging.error("Can't stat(2) %s: %s", filename, err)
2680 exp_size = utils.BytesToMebibyte(st.st_size)
2682 elif ieio == constants.IEIO_RAW_DISK:
2685 real_disk = _OpenRealBD(disk)
2687 if mode == constants.IEM_IMPORT:
2688 # we set here a smaller block size as, due to transport buffering, more
2689 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2690 # is not already there or we pass a wrong path; we use notrunc to no
2691 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2692 # much memory; this means that at best, we flush every 64k, which will
2694 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2695 " bs=%s oflag=dsync"),
2699 elif mode == constants.IEM_EXPORT:
2700 # the block size on the read dd is 1MiB to match our units
2701 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2703 str(1024 * 1024), # 1 MB
2705 exp_size = disk.size
2707 elif ieio == constants.IEIO_SCRIPT:
2708 (disk, disk_index, ) = ieargs
2710 assert isinstance(disk_index, (int, long))
2712 real_disk = _OpenRealBD(disk)
2714 inst_os = OSFromDisk(instance.os)
2715 env = OSEnvironment(instance, inst_os)
2717 if mode == constants.IEM_IMPORT:
2718 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2719 env["IMPORT_INDEX"] = str(disk_index)
2720 script = inst_os.import_script
2722 elif mode == constants.IEM_EXPORT:
2723 env["EXPORT_DEVICE"] = real_disk.dev_path
2724 env["EXPORT_INDEX"] = str(disk_index)
2725 script = inst_os.export_script
2727 # TODO: Pass special environment only to script
2728 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2730 if mode == constants.IEM_IMPORT:
2731 suffix = "| %s" % script_cmd
2733 elif mode == constants.IEM_EXPORT:
2734 prefix = "%s |" % script_cmd
2736 # Let script predict size
2737 exp_size = constants.IE_CUSTOM_SIZE
2740 _Fail("Invalid %s I/O mode %r", mode, ieio)
2742 return (env, prefix, suffix, exp_size)
2745 def _CreateImportExportStatusDir(prefix):
2746 """Creates status directory for import/export.
2749 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2751 (prefix, utils.TimestampForFilename())))
2754 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2755 """Starts an import or export daemon.
2757 @param mode: Import/output mode
2758 @type opts: L{objects.ImportExportOptions}
2759 @param opts: Daemon options
2761 @param host: Remote host for export (None for import)
2763 @param port: Remote port for export (None for import)
2764 @type instance: L{objects.Instance}
2765 @param instance: Instance object
2766 @param ieio: Input/output type
2767 @param ieioargs: Input/output arguments
2770 if mode == constants.IEM_IMPORT:
2773 if not (host is None and port is None):
2774 _Fail("Can not specify host or port on import")
2776 elif mode == constants.IEM_EXPORT:
2779 if host is None or port is None:
2780 _Fail("Host and port must be specified for an export")
2783 _Fail("Invalid mode %r", mode)
2785 if (opts.key_name is None) ^ (opts.ca_pem is None):
2786 _Fail("Cluster certificate can only be used for both key and CA")
2788 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2789 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2791 if opts.key_name is None:
2793 key_path = constants.NODED_CERT_FILE
2794 cert_path = constants.NODED_CERT_FILE
2795 assert opts.ca_pem is None
2797 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2799 assert opts.ca_pem is not None
2801 for i in [key_path, cert_path]:
2802 if not os.path.exists(i):
2803 _Fail("File '%s' does not exist" % i)
2805 status_dir = _CreateImportExportStatusDir(prefix)
2807 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2808 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2809 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2811 if opts.ca_pem is None:
2813 ca = utils.ReadFile(constants.NODED_CERT_FILE)
2818 utils.WriteFile(ca_file, data=ca, mode=0400)
2821 constants.IMPORT_EXPORT_DAEMON,
2823 "--key=%s" % key_path,
2824 "--cert=%s" % cert_path,
2825 "--ca=%s" % ca_file,
2829 cmd.append("--host=%s" % host)
2832 cmd.append("--port=%s" % port)
2835 cmd.append("--compress=%s" % opts.compress)
2838 cmd.append("--magic=%s" % opts.magic)
2840 if exp_size is not None:
2841 cmd.append("--expected-size=%s" % exp_size)
2844 cmd.append("--cmd-prefix=%s" % cmd_prefix)
2847 cmd.append("--cmd-suffix=%s" % cmd_suffix)
2849 logfile = _InstanceLogName(prefix, instance.os, instance.name)
2851 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2852 # support for receiving a file descriptor for output
2853 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2856 # The import/export name is simply the status directory name
2857 return os.path.basename(status_dir)
2860 shutil.rmtree(status_dir, ignore_errors=True)
2864 def GetImportExportStatus(names):
2865 """Returns import/export daemon status.
2867 @type names: sequence
2868 @param names: List of names
2869 @rtype: List of dicts
2870 @return: Returns a list of the state of each named import/export or None if a
2871 status couldn't be read
2877 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2881 data = utils.ReadFile(status_file)
2882 except EnvironmentError, err:
2883 if err.errno != errno.ENOENT:
2891 result.append(serializer.LoadJson(data))
2896 def AbortImportExport(name):
2897 """Sends SIGTERM to a running import/export daemon.
2900 logging.info("Abort import/export %s", name)
2902 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2903 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2906 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2908 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2911 def CleanupImportExport(name):
2912 """Cleanup after an import or export.
2914 If the import/export daemon is still running it's killed. Afterwards the
2915 whole status directory is removed.
2918 logging.info("Finalizing import/export %s", name)
2920 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2922 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2925 logging.info("Import/export %s is still running with PID %s",
2927 utils.KillProcess(pid, waitpid=False)
2929 shutil.rmtree(status_dir, ignore_errors=True)
2932 def _FindDisks(nodes_ip, disks):
2933 """Sets the physical ID on disks and returns the block devices.
2936 # set the correct physical ID
2937 my_name = utils.HostInfo().name
2939 cf.SetPhysicalID(my_name, nodes_ip)
2944 rd = _RecursiveFindBD(cf)
2946 _Fail("Can't find device %s", cf)
2951 def DrbdDisconnectNet(nodes_ip, disks):
2952 """Disconnects the network on a list of drbd devices.
2955 bdevs = _FindDisks(nodes_ip, disks)
2961 except errors.BlockDeviceError, err:
2962 _Fail("Can't change network configuration to standalone mode: %s",
2966 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2967 """Attaches the network on a list of drbd devices.
2970 bdevs = _FindDisks(nodes_ip, disks)
2973 for idx, rd in enumerate(bdevs):
2975 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2976 except EnvironmentError, err:
2977 _Fail("Can't create symlink: %s", err)
2978 # reconnect disks, switch to new master configuration and if
2979 # needed primary mode
2982 rd.AttachNet(multimaster)
2983 except errors.BlockDeviceError, err:
2984 _Fail("Can't change network configuration: %s", err)
2986 # wait until the disks are connected; we need to retry the re-attach
2987 # if the device becomes standalone, as this might happen if the one
2988 # node disconnects and reconnects in a different mode before the
2989 # other node reconnects; in this case, one or both of the nodes will
2990 # decide it has wrong configuration and switch to standalone
2993 all_connected = True
2996 stats = rd.GetProcStatus()
2998 all_connected = (all_connected and
2999 (stats.is_connected or stats.is_in_resync))
3001 if stats.is_standalone:
3002 # peer had different config info and this node became
3003 # standalone, even though this should not happen with the
3004 # new staged way of changing disk configs
3006 rd.AttachNet(multimaster)
3007 except errors.BlockDeviceError, err:
3008 _Fail("Can't change network configuration: %s", err)
3010 if not all_connected:
3011 raise utils.RetryAgain()
3014 # Start with a delay of 100 miliseconds and go up to 5 seconds
3015 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3016 except utils.RetryTimeout:
3017 _Fail("Timeout in disk reconnecting")
3020 # change to primary mode
3024 except errors.BlockDeviceError, err:
3025 _Fail("Can't change to primary mode: %s", err)
3028 def DrbdWaitSync(nodes_ip, disks):
3029 """Wait until DRBDs have synchronized.
3033 stats = rd.GetProcStatus()
3034 if not (stats.is_connected or stats.is_in_resync):
3035 raise utils.RetryAgain()
3038 bdevs = _FindDisks(nodes_ip, disks)
3044 # poll each second for 15 seconds
3045 stats = utils.Retry(_helper, 1, 15, args=[rd])
3046 except utils.RetryTimeout:
3047 stats = rd.GetProcStatus()
3049 if not (stats.is_connected or stats.is_in_resync):
3050 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3051 alldone = alldone and (not stats.is_in_resync)
3052 if stats.sync_percent is not None:
3053 min_resync = min(min_resync, stats.sync_percent)
3055 return (alldone, min_resync)
3058 def GetDrbdUsermodeHelper():
3059 """Returns DRBD usermode helper currently configured.
3063 return bdev.BaseDRBD.GetUsermodeHelper()
3064 except errors.BlockDeviceError, err:
3068 def PowercycleNode(hypervisor_type):
3069 """Hard-powercycle the node.
3071 Because we need to return first, and schedule the powercycle in the
3072 background, we won't be able to report failures nicely.
3075 hyper = hypervisor.GetHypervisor(hypervisor_type)
3079 # if we can't fork, we'll pretend that we're in the child process
3082 return "Reboot scheduled in 5 seconds"
3083 # ensure the child is running on ram
3086 except Exception: # pylint: disable-msg=W0703
3089 hyper.PowercycleNode()
3092 class HooksRunner(object):
3095 This class is instantiated on the node side (ganeti-noded) and not
3099 def __init__(self, hooks_base_dir=None):
3100 """Constructor for hooks runner.
3102 @type hooks_base_dir: str or None
3103 @param hooks_base_dir: if not None, this overrides the
3104 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3107 if hooks_base_dir is None:
3108 hooks_base_dir = constants.HOOKS_BASE_DIR
3109 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3111 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3113 def RunHooks(self, hpath, phase, env):
3114 """Run the scripts in the hooks directory.
3117 @param hpath: the path to the hooks directory which
3120 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3121 L{constants.HOOKS_PHASE_POST}
3123 @param env: dictionary with the environment for the hook
3125 @return: list of 3-element tuples:
3127 - script result, either L{constants.HKR_SUCCESS} or
3128 L{constants.HKR_FAIL}
3129 - output of the script
3131 @raise errors.ProgrammerError: for invalid input
3135 if phase == constants.HOOKS_PHASE_PRE:
3137 elif phase == constants.HOOKS_PHASE_POST:
3140 _Fail("Unknown hooks phase '%s'", phase)
3143 subdir = "%s-%s.d" % (hpath, suffix)
3144 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3148 if not os.path.isdir(dir_name):
3149 # for non-existing/non-dirs, we simply exit instead of logging a
3150 # warning at every operation
3153 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3155 for (relname, relstatus, runresult) in runparts_results:
3156 if relstatus == constants.RUNPARTS_SKIP:
3157 rrval = constants.HKR_SKIP
3159 elif relstatus == constants.RUNPARTS_ERR:
3160 rrval = constants.HKR_FAIL
3161 output = "Hook script execution error: %s" % runresult
3162 elif relstatus == constants.RUNPARTS_RUN:
3163 if runresult.failed:
3164 rrval = constants.HKR_FAIL
3166 rrval = constants.HKR_SUCCESS
3167 output = utils.SafeEncode(runresult.output.strip())
3168 results.append(("%s/%s" % (subdir, relname), rrval, output))
3173 class IAllocatorRunner(object):
3174 """IAllocator runner.
3176 This class is instantiated on the node side (ganeti-noded) and not on
3181 def Run(name, idata):
3182 """Run an iallocator script.
3185 @param name: the iallocator script name
3187 @param idata: the allocator input data
3190 @return: two element tuple of:
3192 - either error message or stdout of allocator (for success)
3195 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3197 if alloc_script is None:
3198 _Fail("iallocator module '%s' not found in the search path", name)
3200 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3204 result = utils.RunCmd([alloc_script, fin_name])
3206 _Fail("iallocator module '%s' failed: %s, output '%s'",
3207 name, result.fail_reason, result.output)
3211 return result.stdout
3214 class DevCacheManager(object):
3215 """Simple class for managing a cache of block device information.
3218 _DEV_PREFIX = "/dev/"
3219 _ROOT_DIR = constants.BDEV_CACHE_DIR
3222 def _ConvertPath(cls, dev_path):
3223 """Converts a /dev/name path to the cache file name.
3225 This replaces slashes with underscores and strips the /dev
3226 prefix. It then returns the full path to the cache file.
3229 @param dev_path: the C{/dev/} path name
3231 @return: the converted path name
3234 if dev_path.startswith(cls._DEV_PREFIX):
3235 dev_path = dev_path[len(cls._DEV_PREFIX):]
3236 dev_path = dev_path.replace("/", "_")
3237 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3241 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3242 """Updates the cache information for a given device.
3245 @param dev_path: the pathname of the device
3247 @param owner: the owner (instance name) of the device
3248 @type on_primary: bool
3249 @param on_primary: whether this is the primary
3252 @param iv_name: the instance-visible name of the
3253 device, as in objects.Disk.iv_name
3258 if dev_path is None:
3259 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3261 fpath = cls._ConvertPath(dev_path)
3267 iv_name = "not_visible"
3268 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3270 utils.WriteFile(fpath, data=fdata)
3271 except EnvironmentError, err:
3272 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3275 def RemoveCache(cls, dev_path):
3276 """Remove data for a dev_path.
3278 This is just a wrapper over L{utils.RemoveFile} with a converted
3279 path name and logging.
3282 @param dev_path: the pathname of the device
3287 if dev_path is None:
3288 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3290 fpath = cls._ConvertPath(dev_path)
3292 utils.RemoveFile(fpath)
3293 except EnvironmentError, err:
3294 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)