4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
63 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64 _ALLOWED_CLEAN_DIRS = frozenset([
66 constants.JOB_QUEUE_ARCHIVE_DIR,
68 constants.CRYPTO_KEYS_DIR,
70 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71 _X509_KEY_FILE = "key"
72 _X509_CERT_FILE = "cert"
73 _IES_STATUS_FILE = "status"
78 class RPCFail(Exception):
79 """Class denoting RPC failure.
81 Its argument is the error message.
86 def _Fail(msg, *args, **kwargs):
87 """Log an error and the raise an RPCFail exception.
89 This exception is then handled specially in the ganeti daemon and
90 turned into a 'failed' return type. As such, this function is a
91 useful shortcut for logging the error and returning it to the master
95 @param msg: the text of the exception
101 if "log" not in kwargs or kwargs["log"]: # if we should log this error
102 if "exc" in kwargs and kwargs["exc"]:
103 logging.exception(msg)
110 """Simple wrapper to return a SimpleStore.
112 @rtype: L{ssconf.SimpleStore}
113 @return: a SimpleStore instance
116 return ssconf.SimpleStore()
119 def _GetSshRunner(cluster_name):
120 """Simple wrapper to return an SshRunner.
122 @type cluster_name: str
123 @param cluster_name: the cluster name, which is needed
124 by the SshRunner constructor
125 @rtype: L{ssh.SshRunner}
126 @return: an SshRunner instance
129 return ssh.SshRunner(cluster_name)
132 def _Decompress(data):
133 """Unpacks data compressed by the RPC client.
135 @type data: list or tuple
136 @param data: Data sent by RPC client
138 @return: Decompressed data
141 assert isinstance(data, (list, tuple))
142 assert len(data) == 2
143 (encoding, content) = data
144 if encoding == constants.RPC_ENCODING_NONE:
146 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147 return zlib.decompress(base64.b64decode(content))
149 raise AssertionError("Unknown data encoding")
152 def _CleanDirectory(path, exclude=None):
153 """Removes all regular files in a directory.
156 @param path: the directory to clean
158 @param exclude: list of files to be excluded, defaults
162 if path not in _ALLOWED_CLEAN_DIRS:
163 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166 if not os.path.isdir(path):
171 # Normalize excluded paths
172 exclude = [os.path.normpath(i) for i in exclude]
174 for rel_name in utils.ListVisibleFiles(path):
175 full_name = utils.PathJoin(path, rel_name)
176 if full_name in exclude:
178 if os.path.isfile(full_name) and not os.path.islink(full_name):
179 utils.RemoveFile(full_name)
182 def _BuildUploadFileList():
183 """Build the list of allowed upload files.
185 This is abstracted so that it's built only once at module import time.
188 allowed_files = set([
189 constants.CLUSTER_CONF_FILE,
191 constants.SSH_KNOWN_HOSTS_FILE,
192 constants.VNC_PASSWORD_FILE,
193 constants.RAPI_CERT_FILE,
194 constants.RAPI_USERS_FILE,
195 constants.CONFD_HMAC_KEY,
196 constants.CLUSTER_DOMAIN_SECRET_FILE,
199 for hv_name in constants.HYPER_TYPES:
200 hv_class = hypervisor.GetHypervisorClass(hv_name)
201 allowed_files.update(hv_class.GetAncillaryFiles())
203 return frozenset(allowed_files)
206 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
210 """Removes job queue files and archived jobs.
216 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
217 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
221 """Returns master information.
223 This is an utility function to compute master information, either
224 for consumption here or from the node daemon.
227 @return: master_netdev, master_ip, master_name
228 @raise RPCFail: in case of errors
233 master_netdev = cfg.GetMasterNetdev()
234 master_ip = cfg.GetMasterIP()
235 master_node = cfg.GetMasterNode()
236 except errors.ConfigurationError, err:
237 _Fail("Cluster configuration incomplete: %s", err, exc=True)
238 return (master_netdev, master_ip, master_node)
241 def StartMaster(start_daemons, no_voting):
242 """Activate local node as master node.
244 The function will always try activate the IP address of the master
245 (unless someone else has it). It will also start the master daemons,
246 based on the start_daemons parameter.
248 @type start_daemons: boolean
249 @param start_daemons: whether to also start the master
250 daemons (ganeti-masterd and ganeti-rapi)
251 @type no_voting: boolean
252 @param no_voting: whether to start ganeti-masterd without a node vote
253 (if start_daemons is True), but still non-interactively
257 # GetMasterInfo will raise an exception if not able to return data
258 master_netdev, master_ip, _ = GetMasterInfo()
261 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
262 if utils.OwnIpAddress(master_ip):
263 # we already have the ip:
264 logging.debug("Master IP already configured, doing nothing")
266 msg = "Someone else has the master ip, not activating"
270 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
271 "dev", master_netdev, "label",
272 "%s:0" % master_netdev])
274 msg = "Can't activate master IP: %s" % result.output
278 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
279 "-s", master_ip, master_ip])
280 # we'll ignore the exit code of arping
282 # and now start the master and rapi daemons
285 masterd_args = "--no-voting --yes-do-it"
290 "EXTRA_MASTERD_ARGS": masterd_args,
293 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
295 msg = "Can't start Ganeti master: %s" % result.output
300 _Fail("; ".join(err_msgs))
303 def StopMaster(stop_daemons):
304 """Deactivate this node as master.
306 The function will always try to deactivate the IP address of the
307 master. It will also stop the master daemons depending on the
308 stop_daemons parameter.
310 @type stop_daemons: boolean
311 @param stop_daemons: whether to also stop the master daemons
312 (ganeti-masterd and ganeti-rapi)
316 # TODO: log and report back to the caller the error failures; we
317 # need to decide in which case we fail the RPC for this
319 # GetMasterInfo will raise an exception if not able to return data
320 master_netdev, master_ip, _ = GetMasterInfo()
322 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
323 "dev", master_netdev])
325 logging.error("Can't remove the master IP, error: %s", result.output)
326 # but otherwise ignore the failure
329 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
331 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
333 result.cmd, result.exit_code, result.output)
336 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
337 """Joins this node to the cluster.
339 This does the following:
340 - updates the hostkeys of the machine (rsa and dsa)
341 - adds the ssh private key to the user
342 - adds the ssh public key to the users' authorized_keys file
345 @param dsa: the DSA private key to write
347 @param dsapub: the DSA public key to write
349 @param rsa: the RSA private key to write
351 @param rsapub: the RSA public key to write
353 @param sshkey: the SSH private key to write
355 @param sshpub: the SSH public key to write
357 @return: the success of the operation
360 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
361 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
362 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
363 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
364 for name, content, mode in sshd_keys:
365 utils.WriteFile(name, data=content, mode=mode)
368 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
370 except errors.OpExecError, err:
371 _Fail("Error while processing user ssh files: %s", err, exc=True)
373 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
374 utils.WriteFile(name, data=content, mode=0600)
376 utils.AddAuthorizedKey(auth_keys, sshpub)
378 result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
380 _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
381 result.cmd, result.exit_code, result.output)
384 def LeaveCluster(modify_ssh_setup):
385 """Cleans up and remove the current node.
387 This function cleans up and prepares the current node to be removed
390 If processing is successful, then it raises an
391 L{errors.QuitGanetiException} which is used as a special case to
392 shutdown the node daemon.
394 @param modify_ssh_setup: boolean
397 _CleanDirectory(constants.DATA_DIR)
398 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
403 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
405 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
407 utils.RemoveFile(priv_key)
408 utils.RemoveFile(pub_key)
409 except errors.OpExecError:
410 logging.exception("Error while processing ssh files")
413 utils.RemoveFile(constants.CONFD_HMAC_KEY)
414 utils.RemoveFile(constants.RAPI_CERT_FILE)
415 utils.RemoveFile(constants.NODED_CERT_FILE)
416 except: # pylint: disable-msg=W0702
417 logging.exception("Error while removing cluster secrets")
419 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
421 logging.error("Command %s failed with exitcode %s and error %s",
422 result.cmd, result.exit_code, result.output)
424 # Raise a custom exception (handled in ganeti-noded)
425 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
428 def GetNodeInfo(vgname, hypervisor_type):
429 """Gives back a hash with different information about the node.
431 @type vgname: C{string}
432 @param vgname: the name of the volume group to ask for disk space information
433 @type hypervisor_type: C{str}
434 @param hypervisor_type: the name of the hypervisor to ask for
437 @return: dictionary with the following keys:
438 - vg_size is the size of the configured volume group in MiB
439 - vg_free is the free size of the volume group in MiB
440 - memory_dom0 is the memory allocated for domain0 in MiB
441 - memory_free is the currently available (free) ram in MiB
442 - memory_total is the total number of ram in MiB
446 vginfo = _GetVGInfo(vgname)
447 outputarray['vg_size'] = vginfo['vg_size']
448 outputarray['vg_free'] = vginfo['vg_free']
450 hyper = hypervisor.GetHypervisor(hypervisor_type)
451 hyp_info = hyper.GetNodeInfo()
452 if hyp_info is not None:
453 outputarray.update(hyp_info)
455 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
460 def VerifyNode(what, cluster_name):
461 """Verify the status of the local node.
463 Based on the input L{what} parameter, various checks are done on the
466 If the I{filelist} key is present, this list of
467 files is checksummed and the file/checksum pairs are returned.
469 If the I{nodelist} key is present, we check that we have
470 connectivity via ssh with the target nodes (and check the hostname
473 If the I{node-net-test} key is present, we check that we have
474 connectivity to the given nodes via both primary IP and, if
475 applicable, secondary IPs.
478 @param what: a dictionary of things to check:
479 - filelist: list of files for which to compute checksums
480 - nodelist: list of nodes we should check ssh communication with
481 - node-net-test: list of nodes we should check node daemon port
483 - hypervisor: list with hypervisors to run the verify for
485 @return: a dictionary with the same keys as the input dict, and
486 values representing the result of the checks
490 my_name = utils.HostInfo().name
491 port = utils.GetDaemonPort(constants.NODED)
493 if constants.NV_HYPERVISOR in what:
494 result[constants.NV_HYPERVISOR] = tmp = {}
495 for hv_name in what[constants.NV_HYPERVISOR]:
497 val = hypervisor.GetHypervisor(hv_name).Verify()
498 except errors.HypervisorError, err:
499 val = "Error while checking hypervisor: %s" % str(err)
502 if constants.NV_FILELIST in what:
503 result[constants.NV_FILELIST] = utils.FingerprintFiles(
504 what[constants.NV_FILELIST])
506 if constants.NV_NODELIST in what:
507 result[constants.NV_NODELIST] = tmp = {}
508 random.shuffle(what[constants.NV_NODELIST])
509 for node in what[constants.NV_NODELIST]:
510 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
514 if constants.NV_NODENETTEST in what:
515 result[constants.NV_NODENETTEST] = tmp = {}
516 my_pip = my_sip = None
517 for name, pip, sip in what[constants.NV_NODENETTEST]:
523 tmp[my_name] = ("Can't find my own primary/secondary IP"
526 for name, pip, sip in what[constants.NV_NODENETTEST]:
528 if not utils.TcpPing(pip, port, source=my_pip):
529 fail.append("primary")
531 if not utils.TcpPing(sip, port, source=my_sip):
532 fail.append("secondary")
534 tmp[name] = ("failure using the %s interface(s)" %
537 if constants.NV_MASTERIP in what:
538 # FIXME: add checks on incoming data structures (here and in the
539 # rest of the function)
540 master_name, master_ip = what[constants.NV_MASTERIP]
541 if master_name == my_name:
542 source = constants.IP4_ADDRESS_LOCALHOST
545 result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
548 if constants.NV_LVLIST in what:
550 val = GetVolumeList(what[constants.NV_LVLIST])
553 result[constants.NV_LVLIST] = val
555 if constants.NV_INSTANCELIST in what:
556 # GetInstanceList can fail
558 val = GetInstanceList(what[constants.NV_INSTANCELIST])
561 result[constants.NV_INSTANCELIST] = val
563 if constants.NV_VGLIST in what:
564 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
566 if constants.NV_PVLIST in what:
567 result[constants.NV_PVLIST] = \
568 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
569 filter_allocatable=False)
571 if constants.NV_VERSION in what:
572 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
573 constants.RELEASE_VERSION)
575 if constants.NV_HVINFO in what:
576 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
577 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
579 if constants.NV_DRBDLIST in what:
581 used_minors = bdev.DRBD8.GetUsedDevs().keys()
582 except errors.BlockDeviceError, err:
583 logging.warning("Can't get used minors list", exc_info=True)
584 used_minors = str(err)
585 result[constants.NV_DRBDLIST] = used_minors
587 if constants.NV_NODESETUP in what:
588 result[constants.NV_NODESETUP] = tmpr = []
589 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
590 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
591 " under /sys, missing required directories /sys/block"
592 " and /sys/class/net")
593 if (not os.path.isdir("/proc/sys") or
594 not os.path.isfile("/proc/sysrq-trigger")):
595 tmpr.append("The procfs filesystem doesn't seem to be mounted"
596 " under /proc, missing required directory /proc/sys and"
597 " the file /proc/sysrq-trigger")
599 if constants.NV_TIME in what:
600 result[constants.NV_TIME] = utils.SplitTime(time.time())
602 if constants.NV_OSLIST in what:
603 result[constants.NV_OSLIST] = DiagnoseOS()
608 def GetVolumeList(vg_name):
609 """Compute list of logical volumes and their size.
612 @param vg_name: the volume group whose LVs we should list
615 dictionary of all partions (key) with value being a tuple of
616 their size (in MiB), inactive and online status::
618 {'test1': ('20.06', True, True)}
620 in case of errors, a string is returned with the error
626 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
627 "--separator=%s" % sep,
628 "-olv_name,lv_size,lv_attr", vg_name])
630 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
632 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
633 for line in result.stdout.splitlines():
635 match = valid_line_re.match(line)
637 logging.error("Invalid line returned from lvs output: '%s'", line)
639 name, size, attr = match.groups()
640 inactive = attr[4] == '-'
641 online = attr[5] == 'o'
642 virtual = attr[0] == 'v'
644 # we don't want to report such volumes as existing, since they
645 # don't really hold data
647 lvs[name] = (size, inactive, online)
652 def ListVolumeGroups():
653 """List the volume groups and their size.
656 @return: dictionary with keys volume name and values the
660 return utils.ListVolumeGroups()
664 """List all volumes on this node.
668 A list of dictionaries, each having four keys:
669 - name: the logical volume name,
670 - size: the size of the logical volume
671 - dev: the physical device on which the LV lives
672 - vg: the volume group to which it belongs
674 In case of errors, we return an empty list and log the
677 Note that since a logical volume can live on multiple physical
678 volumes, the resulting list might include a logical volume
682 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
684 "--options=lv_name,lv_size,devices,vg_name"])
686 _Fail("Failed to list logical volumes, lvs output: %s",
690 return dev.split('(')[0]
693 return [parse_dev(x) for x in dev.split(",")]
696 line = [v.strip() for v in line]
697 return [{'name': line[0], 'size': line[1],
698 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
701 for line in result.stdout.splitlines():
702 if line.count('|') >= 3:
703 all_devs.extend(map_line(line.split('|')))
705 logging.warning("Strange line in the output from lvs: '%s'", line)
709 def BridgesExist(bridges_list):
710 """Check if a list of bridges exist on the current node.
713 @return: C{True} if all of them exist, C{False} otherwise
717 for bridge in bridges_list:
718 if not utils.BridgeExists(bridge):
719 missing.append(bridge)
722 _Fail("Missing bridges %s", utils.CommaJoin(missing))
725 def GetInstanceList(hypervisor_list):
726 """Provides a list of instances.
728 @type hypervisor_list: list
729 @param hypervisor_list: the list of hypervisors to query information
732 @return: a list of all running instances on the current node
733 - instance1.example.com
734 - instance2.example.com
738 for hname in hypervisor_list:
740 names = hypervisor.GetHypervisor(hname).ListInstances()
741 results.extend(names)
742 except errors.HypervisorError, err:
743 _Fail("Error enumerating instances (hypervisor %s): %s",
744 hname, err, exc=True)
749 def GetInstanceInfo(instance, hname):
750 """Gives back the information about an instance as a dictionary.
752 @type instance: string
753 @param instance: the instance name
755 @param hname: the hypervisor type of the instance
758 @return: dictionary with the following keys:
759 - memory: memory size of instance (int)
760 - state: xen state of instance (string)
761 - time: cpu time of instance (float)
766 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
767 if iinfo is not None:
768 output['memory'] = iinfo[2]
769 output['state'] = iinfo[4]
770 output['time'] = iinfo[5]
775 def GetInstanceMigratable(instance):
776 """Gives whether an instance can be migrated.
778 @type instance: L{objects.Instance}
779 @param instance: object representing the instance to be checked.
782 @return: tuple of (result, description) where:
783 - result: whether the instance can be migrated or not
784 - description: a description of the issue, if relevant
787 hyper = hypervisor.GetHypervisor(instance.hypervisor)
788 iname = instance.name
789 if iname not in hyper.ListInstances():
790 _Fail("Instance %s is not running", iname)
792 for idx in range(len(instance.disks)):
793 link_name = _GetBlockDevSymlinkPath(iname, idx)
794 if not os.path.islink(link_name):
795 _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
798 def GetAllInstancesInfo(hypervisor_list):
799 """Gather data about all instances.
801 This is the equivalent of L{GetInstanceInfo}, except that it
802 computes data for all instances at once, thus being faster if one
803 needs data about more than one instance.
805 @type hypervisor_list: list
806 @param hypervisor_list: list of hypervisors to query for instance data
809 @return: dictionary of instance: data, with data having the following keys:
810 - memory: memory size of instance (int)
811 - state: xen state of instance (string)
812 - time: cpu time of instance (float)
813 - vcpus: the number of vcpus
818 for hname in hypervisor_list:
819 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
821 for name, _, memory, vcpus, state, times in iinfo:
829 # we only check static parameters, like memory and vcpus,
830 # and not state and time which can change between the
831 # invocations of the different hypervisors
832 for key in 'memory', 'vcpus':
833 if value[key] != output[name][key]:
834 _Fail("Instance %s is running twice"
835 " with different parameters", name)
841 def _InstanceLogName(kind, os_name, instance):
842 """Compute the OS log filename for a given instance and operation.
844 The instance name and os name are passed in as strings since not all
845 operations have these as part of an instance object.
848 @param kind: the operation type (e.g. add, import, etc.)
849 @type os_name: string
850 @param os_name: the os name
851 @type instance: string
852 @param instance: the name of the instance being imported/added/etc.
855 # TODO: Use tempfile.mkstemp to create unique filename
856 base = ("%s-%s-%s-%s.log" %
857 (kind, os_name, instance, utils.TimestampForFilename()))
858 return utils.PathJoin(constants.LOG_OS_DIR, base)
861 def InstanceOsAdd(instance, reinstall, debug):
862 """Add an OS to an instance.
864 @type instance: L{objects.Instance}
865 @param instance: Instance whose OS is to be installed
866 @type reinstall: boolean
867 @param reinstall: whether this is an instance reinstall
869 @param debug: debug level, passed to the OS scripts
873 inst_os = OSFromDisk(instance.os)
875 create_env = OSEnvironment(instance, inst_os, debug)
877 create_env['INSTANCE_REINSTALL'] = "1"
879 logfile = _InstanceLogName("add", instance.os, instance.name)
881 result = utils.RunCmd([inst_os.create_script], env=create_env,
882 cwd=inst_os.path, output=logfile,)
884 logging.error("os create command '%s' returned error: %s, logfile: %s,"
885 " output: %s", result.cmd, result.fail_reason, logfile,
887 lines = [utils.SafeEncode(val)
888 for val in utils.TailFile(logfile, lines=20)]
889 _Fail("OS create script failed (%s), last lines in the"
890 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
893 def RunRenameInstance(instance, old_name, debug):
894 """Run the OS rename script for an instance.
896 @type instance: L{objects.Instance}
897 @param instance: Instance whose OS is to be installed
898 @type old_name: string
899 @param old_name: previous instance name
901 @param debug: debug level, passed to the OS scripts
903 @return: the success of the operation
906 inst_os = OSFromDisk(instance.os)
908 rename_env = OSEnvironment(instance, inst_os, debug)
909 rename_env['OLD_INSTANCE_NAME'] = old_name
911 logfile = _InstanceLogName("rename", instance.os,
912 "%s-%s" % (old_name, instance.name))
914 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
915 cwd=inst_os.path, output=logfile)
918 logging.error("os create command '%s' returned error: %s output: %s",
919 result.cmd, result.fail_reason, result.output)
920 lines = [utils.SafeEncode(val)
921 for val in utils.TailFile(logfile, lines=20)]
922 _Fail("OS rename script failed (%s), last lines in the"
923 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
926 def _GetVGInfo(vg_name):
927 """Get information about the volume group.
930 @param vg_name: the volume group which we query
933 A dictionary with the following keys:
934 - C{vg_size} is the total size of the volume group in MiB
935 - C{vg_free} is the free size of the volume group in MiB
936 - C{pv_count} are the number of physical disks in that VG
938 If an error occurs during gathering of data, we return the same dict
939 with keys all set to None.
942 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
944 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
945 "--nosuffix", "--units=m", "--separator=:", vg_name])
948 logging.error("volume group %s not present", vg_name)
950 valarr = retval.stdout.strip().rstrip(':').split(':')
954 "vg_size": int(round(float(valarr[0]), 0)),
955 "vg_free": int(round(float(valarr[1]), 0)),
956 "pv_count": int(valarr[2]),
958 except (TypeError, ValueError), err:
959 logging.exception("Fail to parse vgs output: %s", err)
961 logging.error("vgs output has the wrong number of fields (expected"
962 " three): %s", str(valarr))
966 def _GetBlockDevSymlinkPath(instance_name, idx):
967 return utils.PathJoin(constants.DISK_LINKS_DIR,
968 "%s:%d" % (instance_name, idx))
971 def _SymlinkBlockDev(instance_name, device_path, idx):
972 """Set up symlinks to a instance's block device.
974 This is an auxiliary function run when an instance is start (on the primary
975 node) or when an instance is migrated (on the target node).
978 @param instance_name: the name of the target instance
979 @param device_path: path of the physical block device, on the node
980 @param idx: the disk index
981 @return: absolute path to the disk's symlink
984 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
986 os.symlink(device_path, link_name)
988 if err.errno == errno.EEXIST:
989 if (not os.path.islink(link_name) or
990 os.readlink(link_name) != device_path):
992 os.symlink(device_path, link_name)
999 def _RemoveBlockDevLinks(instance_name, disks):
1000 """Remove the block device symlinks belonging to the given instance.
1003 for idx, _ in enumerate(disks):
1004 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1005 if os.path.islink(link_name):
1007 os.remove(link_name)
1009 logging.exception("Can't remove symlink '%s'", link_name)
1012 def _GatherAndLinkBlockDevs(instance):
1013 """Set up an instance's block device(s).
1015 This is run on the primary node at instance startup. The block
1016 devices must be already assembled.
1018 @type instance: L{objects.Instance}
1019 @param instance: the instance whose disks we shoul assemble
1021 @return: list of (disk_object, device_path)
1025 for idx, disk in enumerate(instance.disks):
1026 device = _RecursiveFindBD(disk)
1028 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1032 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1034 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1037 block_devices.append((disk, link_name))
1039 return block_devices
1042 def StartInstance(instance):
1043 """Start an instance.
1045 @type instance: L{objects.Instance}
1046 @param instance: the instance object
1050 running_instances = GetInstanceList([instance.hypervisor])
1052 if instance.name in running_instances:
1053 logging.info("Instance %s already running, not starting", instance.name)
1057 block_devices = _GatherAndLinkBlockDevs(instance)
1058 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1059 hyper.StartInstance(instance, block_devices)
1060 except errors.BlockDeviceError, err:
1061 _Fail("Block device error: %s", err, exc=True)
1062 except errors.HypervisorError, err:
1063 _RemoveBlockDevLinks(instance.name, instance.disks)
1064 _Fail("Hypervisor error: %s", err, exc=True)
1067 def InstanceShutdown(instance, timeout):
1068 """Shut an instance down.
1070 @note: this functions uses polling with a hardcoded timeout.
1072 @type instance: L{objects.Instance}
1073 @param instance: the instance object
1074 @type timeout: integer
1075 @param timeout: maximum timeout for soft shutdown
1079 hv_name = instance.hypervisor
1080 hyper = hypervisor.GetHypervisor(hv_name)
1081 iname = instance.name
1083 if instance.name not in hyper.ListInstances():
1084 logging.info("Instance %s not running, doing nothing", iname)
1089 self.tried_once = False
1092 if iname not in hyper.ListInstances():
1096 hyper.StopInstance(instance, retry=self.tried_once)
1097 except errors.HypervisorError, err:
1098 if iname not in hyper.ListInstances():
1099 # if the instance is no longer existing, consider this a
1100 # success and go to cleanup
1103 _Fail("Failed to stop instance %s: %s", iname, err)
1105 self.tried_once = True
1107 raise utils.RetryAgain()
1110 utils.Retry(_TryShutdown(), 5, timeout)
1111 except utils.RetryTimeout:
1112 # the shutdown did not succeed
1113 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1116 hyper.StopInstance(instance, force=True)
1117 except errors.HypervisorError, err:
1118 if iname in hyper.ListInstances():
1119 # only raise an error if the instance still exists, otherwise
1120 # the error could simply be "instance ... unknown"!
1121 _Fail("Failed to force stop instance %s: %s", iname, err)
1125 if iname in hyper.ListInstances():
1126 _Fail("Could not shutdown instance %s even by destroy", iname)
1129 hyper.CleanupInstance(instance.name)
1130 except errors.HypervisorError, err:
1131 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1133 _RemoveBlockDevLinks(iname, instance.disks)
1136 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1137 """Reboot an instance.
1139 @type instance: L{objects.Instance}
1140 @param instance: the instance object to reboot
1141 @type reboot_type: str
1142 @param reboot_type: the type of reboot, one the following
1144 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1145 instance OS, do not recreate the VM
1146 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1147 restart the VM (at the hypervisor level)
1148 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1149 not accepted here, since that mode is handled differently, in
1150 cmdlib, and translates into full stop and start of the
1151 instance (instead of a call_instance_reboot RPC)
1152 @type shutdown_timeout: integer
1153 @param shutdown_timeout: maximum timeout for soft shutdown
1157 running_instances = GetInstanceList([instance.hypervisor])
1159 if instance.name not in running_instances:
1160 _Fail("Cannot reboot instance %s that is not running", instance.name)
1162 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1163 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1165 hyper.RebootInstance(instance)
1166 except errors.HypervisorError, err:
1167 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1168 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1170 InstanceShutdown(instance, shutdown_timeout)
1171 return StartInstance(instance)
1172 except errors.HypervisorError, err:
1173 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1175 _Fail("Invalid reboot_type received: %s", reboot_type)
1178 def MigrationInfo(instance):
1179 """Gather information about an instance to be migrated.
1181 @type instance: L{objects.Instance}
1182 @param instance: the instance definition
1185 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1187 info = hyper.MigrationInfo(instance)
1188 except errors.HypervisorError, err:
1189 _Fail("Failed to fetch migration information: %s", err, exc=True)
1193 def AcceptInstance(instance, info, target):
1194 """Prepare the node to accept an instance.
1196 @type instance: L{objects.Instance}
1197 @param instance: the instance definition
1198 @type info: string/data (opaque)
1199 @param info: migration information, from the source node
1200 @type target: string
1201 @param target: target host (usually ip), on this node
1204 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1206 hyper.AcceptInstance(instance, info, target)
1207 except errors.HypervisorError, err:
1208 _Fail("Failed to accept instance: %s", err, exc=True)
1211 def FinalizeMigration(instance, info, success):
1212 """Finalize any preparation to accept an instance.
1214 @type instance: L{objects.Instance}
1215 @param instance: the instance definition
1216 @type info: string/data (opaque)
1217 @param info: migration information, from the source node
1218 @type success: boolean
1219 @param success: whether the migration was a success or a failure
1222 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1224 hyper.FinalizeMigration(instance, info, success)
1225 except errors.HypervisorError, err:
1226 _Fail("Failed to finalize migration: %s", err, exc=True)
1229 def MigrateInstance(instance, target, live):
1230 """Migrates an instance to another node.
1232 @type instance: L{objects.Instance}
1233 @param instance: the instance definition
1234 @type target: string
1235 @param target: the target node name
1237 @param live: whether the migration should be done live or not (the
1238 interpretation of this parameter is left to the hypervisor)
1240 @return: a tuple of (success, msg) where:
1241 - succes is a boolean denoting the success/failure of the operation
1242 - msg is a string with details in case of failure
1245 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1248 hyper.MigrateInstance(instance, target, live)
1249 except errors.HypervisorError, err:
1250 _Fail("Failed to migrate instance: %s", err, exc=True)
1253 def BlockdevCreate(disk, size, owner, on_primary, info):
1254 """Creates a block device for an instance.
1256 @type disk: L{objects.Disk}
1257 @param disk: the object describing the disk we should create
1259 @param size: the size of the physical underlying device, in MiB
1261 @param owner: the name of the instance for which disk is created,
1262 used for device cache data
1263 @type on_primary: boolean
1264 @param on_primary: indicates if it is the primary node or not
1266 @param info: string that will be sent to the physical device
1267 creation, used for example to set (LVM) tags on LVs
1269 @return: the new unique_id of the device (this can sometime be
1270 computed only after creation), or None. On secondary nodes,
1271 it's not required to return anything.
1274 # TODO: remove the obsolete 'size' argument
1275 # pylint: disable-msg=W0613
1278 for child in disk.children:
1280 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1281 except errors.BlockDeviceError, err:
1282 _Fail("Can't assemble device %s: %s", child, err)
1283 if on_primary or disk.AssembleOnSecondary():
1284 # we need the children open in case the device itself has to
1287 # pylint: disable-msg=E1103
1289 except errors.BlockDeviceError, err:
1290 _Fail("Can't make child '%s' read-write: %s", child, err)
1294 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1295 except errors.BlockDeviceError, err:
1296 _Fail("Can't create block device: %s", err)
1298 if on_primary or disk.AssembleOnSecondary():
1301 except errors.BlockDeviceError, err:
1302 _Fail("Can't assemble device after creation, unusual event: %s", err)
1303 device.SetSyncSpeed(constants.SYNC_SPEED)
1304 if on_primary or disk.OpenOnSecondary():
1306 device.Open(force=True)
1307 except errors.BlockDeviceError, err:
1308 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1309 DevCacheManager.UpdateCache(device.dev_path, owner,
1310 on_primary, disk.iv_name)
1312 device.SetInfo(info)
1314 return device.unique_id
1317 def BlockdevRemove(disk):
1318 """Remove a block device.
1320 @note: This is intended to be called recursively.
1322 @type disk: L{objects.Disk}
1323 @param disk: the disk object we should remove
1325 @return: the success of the operation
1330 rdev = _RecursiveFindBD(disk)
1331 except errors.BlockDeviceError, err:
1332 # probably can't attach
1333 logging.info("Can't attach to device %s in remove", disk)
1335 if rdev is not None:
1336 r_path = rdev.dev_path
1339 except errors.BlockDeviceError, err:
1340 msgs.append(str(err))
1342 DevCacheManager.RemoveCache(r_path)
1345 for child in disk.children:
1347 BlockdevRemove(child)
1348 except RPCFail, err:
1349 msgs.append(str(err))
1352 _Fail("; ".join(msgs))
1355 def _RecursiveAssembleBD(disk, owner, as_primary):
1356 """Activate a block device for an instance.
1358 This is run on the primary and secondary nodes for an instance.
1360 @note: this function is called recursively.
1362 @type disk: L{objects.Disk}
1363 @param disk: the disk we try to assemble
1365 @param owner: the name of the instance which owns the disk
1366 @type as_primary: boolean
1367 @param as_primary: if we should make the block device
1370 @return: the assembled device or None (in case no device
1372 @raise errors.BlockDeviceError: in case there is an error
1373 during the activation of the children or the device
1379 mcn = disk.ChildrenNeeded()
1381 mcn = 0 # max number of Nones allowed
1383 mcn = len(disk.children) - mcn # max number of Nones
1384 for chld_disk in disk.children:
1386 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1387 except errors.BlockDeviceError, err:
1388 if children.count(None) >= mcn:
1391 logging.error("Error in child activation (but continuing): %s",
1393 children.append(cdev)
1395 if as_primary or disk.AssembleOnSecondary():
1396 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1397 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1399 if as_primary or disk.OpenOnSecondary():
1401 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1402 as_primary, disk.iv_name)
1409 def BlockdevAssemble(disk, owner, as_primary):
1410 """Activate a block device for an instance.
1412 This is a wrapper over _RecursiveAssembleBD.
1414 @rtype: str or boolean
1415 @return: a C{/dev/...} path for primary nodes, and
1416 C{True} for secondary nodes
1420 result = _RecursiveAssembleBD(disk, owner, as_primary)
1421 if isinstance(result, bdev.BlockDev):
1422 # pylint: disable-msg=E1103
1423 result = result.dev_path
1424 except errors.BlockDeviceError, err:
1425 _Fail("Error while assembling disk: %s", err, exc=True)
1430 def BlockdevShutdown(disk):
1431 """Shut down a block device.
1433 First, if the device is assembled (Attach() is successful), then
1434 the device is shutdown. Then the children of the device are
1437 This function is called recursively. Note that we don't cache the
1438 children or such, as oppossed to assemble, shutdown of different
1439 devices doesn't require that the upper device was active.
1441 @type disk: L{objects.Disk}
1442 @param disk: the description of the disk we should
1448 r_dev = _RecursiveFindBD(disk)
1449 if r_dev is not None:
1450 r_path = r_dev.dev_path
1453 DevCacheManager.RemoveCache(r_path)
1454 except errors.BlockDeviceError, err:
1455 msgs.append(str(err))
1458 for child in disk.children:
1460 BlockdevShutdown(child)
1461 except RPCFail, err:
1462 msgs.append(str(err))
1465 _Fail("; ".join(msgs))
1468 def BlockdevAddchildren(parent_cdev, new_cdevs):
1469 """Extend a mirrored block device.
1471 @type parent_cdev: L{objects.Disk}
1472 @param parent_cdev: the disk to which we should add children
1473 @type new_cdevs: list of L{objects.Disk}
1474 @param new_cdevs: the list of children which we should add
1478 parent_bdev = _RecursiveFindBD(parent_cdev)
1479 if parent_bdev is None:
1480 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1481 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1482 if new_bdevs.count(None) > 0:
1483 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1484 parent_bdev.AddChildren(new_bdevs)
1487 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1488 """Shrink a mirrored block device.
1490 @type parent_cdev: L{objects.Disk}
1491 @param parent_cdev: the disk from which we should remove children
1492 @type new_cdevs: list of L{objects.Disk}
1493 @param new_cdevs: the list of children which we should remove
1497 parent_bdev = _RecursiveFindBD(parent_cdev)
1498 if parent_bdev is None:
1499 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1501 for disk in new_cdevs:
1502 rpath = disk.StaticDevPath()
1504 bd = _RecursiveFindBD(disk)
1506 _Fail("Can't find device %s while removing children", disk)
1508 devs.append(bd.dev_path)
1510 if not utils.IsNormAbsPath(rpath):
1511 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1513 parent_bdev.RemoveChildren(devs)
1516 def BlockdevGetmirrorstatus(disks):
1517 """Get the mirroring status of a list of devices.
1519 @type disks: list of L{objects.Disk}
1520 @param disks: the list of disks which we should query
1523 a list of (mirror_done, estimated_time) tuples, which
1524 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1525 @raise errors.BlockDeviceError: if any of the disks cannot be
1531 rbd = _RecursiveFindBD(dsk)
1533 _Fail("Can't find device %s", dsk)
1535 stats.append(rbd.CombinedSyncStatus())
1540 def _RecursiveFindBD(disk):
1541 """Check if a device is activated.
1543 If so, return information about the real device.
1545 @type disk: L{objects.Disk}
1546 @param disk: the disk object we need to find
1548 @return: None if the device can't be found,
1549 otherwise the device instance
1554 for chdisk in disk.children:
1555 children.append(_RecursiveFindBD(chdisk))
1557 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1560 def _OpenRealBD(disk):
1561 """Opens the underlying block device of a disk.
1563 @type disk: L{objects.Disk}
1564 @param disk: the disk object we want to open
1567 real_disk = _RecursiveFindBD(disk)
1568 if real_disk is None:
1569 _Fail("Block device '%s' is not set up", disk)
1576 def BlockdevFind(disk):
1577 """Check if a device is activated.
1579 If it is, return information about the real device.
1581 @type disk: L{objects.Disk}
1582 @param disk: the disk to find
1583 @rtype: None or objects.BlockDevStatus
1584 @return: None if the disk cannot be found, otherwise a the current
1589 rbd = _RecursiveFindBD(disk)
1590 except errors.BlockDeviceError, err:
1591 _Fail("Failed to find device: %s", err, exc=True)
1596 return rbd.GetSyncStatus()
1599 def BlockdevGetsize(disks):
1600 """Computes the size of the given disks.
1602 If a disk is not found, returns None instead.
1604 @type disks: list of L{objects.Disk}
1605 @param disks: the list of disk to compute the size for
1607 @return: list with elements None if the disk cannot be found,
1614 rbd = _RecursiveFindBD(cf)
1615 except errors.BlockDeviceError:
1621 result.append(rbd.GetActualSize())
1625 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1626 """Export a block device to a remote node.
1628 @type disk: L{objects.Disk}
1629 @param disk: the description of the disk to export
1630 @type dest_node: str
1631 @param dest_node: the destination node to export to
1632 @type dest_path: str
1633 @param dest_path: the destination path on the target node
1634 @type cluster_name: str
1635 @param cluster_name: the cluster name, needed for SSH hostalias
1639 real_disk = _OpenRealBD(disk)
1641 # the block size on the read dd is 1MiB to match our units
1642 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1643 "dd if=%s bs=1048576 count=%s",
1644 real_disk.dev_path, str(disk.size))
1646 # we set here a smaller block size as, due to ssh buffering, more
1647 # than 64-128k will mostly ignored; we use nocreat to fail if the
1648 # device is not already there or we pass a wrong path; we use
1649 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1650 # to not buffer too much memory; this means that at best, we flush
1651 # every 64k, which will not be very fast
1652 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1653 " oflag=dsync", dest_path)
1655 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1656 constants.GANETI_RUNAS,
1659 # all commands have been checked, so we're safe to combine them
1660 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1662 result = utils.RunCmd(["bash", "-c", command])
1665 _Fail("Disk copy command '%s' returned error: %s"
1666 " output: %s", command, result.fail_reason, result.output)
1669 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1670 """Write a file to the filesystem.
1672 This allows the master to overwrite(!) a file. It will only perform
1673 the operation if the file belongs to a list of configuration files.
1675 @type file_name: str
1676 @param file_name: the target file name
1678 @param data: the new contents of the file
1680 @param mode: the mode to give the file (can be None)
1682 @param uid: the owner of the file (can be -1 for default)
1684 @param gid: the group of the file (can be -1 for default)
1686 @param atime: the atime to set on the file (can be None)
1688 @param mtime: the mtime to set on the file (can be None)
1692 if not os.path.isabs(file_name):
1693 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1695 if file_name not in _ALLOWED_UPLOAD_FILES:
1696 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1699 raw_data = _Decompress(data)
1701 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1702 atime=atime, mtime=mtime)
1705 def WriteSsconfFiles(values):
1706 """Update all ssconf files.
1708 Wrapper around the SimpleStore.WriteFiles.
1711 ssconf.SimpleStore().WriteFiles(values)
1714 def _ErrnoOrStr(err):
1715 """Format an EnvironmentError exception.
1717 If the L{err} argument has an errno attribute, it will be looked up
1718 and converted into a textual C{E...} description. Otherwise the
1719 string representation of the error will be returned.
1721 @type err: L{EnvironmentError}
1722 @param err: the exception to format
1725 if hasattr(err, 'errno'):
1726 detail = errno.errorcode[err.errno]
1732 def _OSOndiskAPIVersion(os_dir):
1733 """Compute and return the API version of a given OS.
1735 This function will try to read the API version of the OS residing in
1736 the 'os_dir' directory.
1739 @param os_dir: the directory in which we should look for the OS
1741 @return: tuple (status, data) with status denoting the validity and
1742 data holding either the vaid versions or an error message
1745 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1748 st = os.stat(api_file)
1749 except EnvironmentError, err:
1750 return False, ("Required file '%s' not found under path %s: %s" %
1751 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1753 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1754 return False, ("File '%s' in %s is not a regular file" %
1755 (constants.OS_API_FILE, os_dir))
1758 api_versions = utils.ReadFile(api_file).splitlines()
1759 except EnvironmentError, err:
1760 return False, ("Error while reading the API version file at %s: %s" %
1761 (api_file, _ErrnoOrStr(err)))
1764 api_versions = [int(version.strip()) for version in api_versions]
1765 except (TypeError, ValueError), err:
1766 return False, ("API version(s) can't be converted to integer: %s" %
1769 return True, api_versions
1772 def DiagnoseOS(top_dirs=None):
1773 """Compute the validity for all OSes.
1775 @type top_dirs: list
1776 @param top_dirs: the list of directories in which to
1777 search (if not given defaults to
1778 L{constants.OS_SEARCH_PATH})
1779 @rtype: list of L{objects.OS}
1780 @return: a list of tuples (name, path, status, diagnose, variants,
1781 parameters, api_version) for all (potential) OSes under all
1782 search paths, where:
1783 - name is the (potential) OS name
1784 - path is the full path to the OS
1785 - status True/False is the validity of the OS
1786 - diagnose is the error message for an invalid OS, otherwise empty
1787 - variants is a list of supported OS variants, if any
1788 - parameters is a list of (name, help) parameters, if any
1789 - api_version is a list of support OS API versions
1792 if top_dirs is None:
1793 top_dirs = constants.OS_SEARCH_PATH
1796 for dir_name in top_dirs:
1797 if os.path.isdir(dir_name):
1799 f_names = utils.ListVisibleFiles(dir_name)
1800 except EnvironmentError, err:
1801 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1803 for name in f_names:
1804 os_path = utils.PathJoin(dir_name, name)
1805 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1808 variants = os_inst.supported_variants
1809 parameters = os_inst.supported_parameters
1810 api_versions = os_inst.api_versions
1813 variants = parameters = api_versions = []
1814 result.append((name, os_path, status, diagnose, variants,
1815 parameters, api_versions))
1820 def _TryOSFromDisk(name, base_dir=None):
1821 """Create an OS instance from disk.
1823 This function will return an OS instance if the given name is a
1826 @type base_dir: string
1827 @keyword base_dir: Base directory containing OS installations.
1828 Defaults to a search in all the OS_SEARCH_PATH dirs.
1830 @return: success and either the OS instance if we find a valid one,
1834 if base_dir is None:
1835 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1837 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1840 return False, "Directory for OS %s not found in search path" % name
1842 status, api_versions = _OSOndiskAPIVersion(os_dir)
1845 return status, api_versions
1847 if not constants.OS_API_VERSIONS.intersection(api_versions):
1848 return False, ("API version mismatch for path '%s': found %s, want %s." %
1849 (os_dir, api_versions, constants.OS_API_VERSIONS))
1851 # OS Files dictionary, we will populate it with the absolute path names
1852 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1854 if max(api_versions) >= constants.OS_API_V15:
1855 os_files[constants.OS_VARIANTS_FILE] = ''
1857 if max(api_versions) >= constants.OS_API_V20:
1858 os_files[constants.OS_PARAMETERS_FILE] = ''
1860 del os_files[constants.OS_SCRIPT_VERIFY]
1862 for filename in os_files:
1863 os_files[filename] = utils.PathJoin(os_dir, filename)
1866 st = os.stat(os_files[filename])
1867 except EnvironmentError, err:
1868 return False, ("File '%s' under path '%s' is missing (%s)" %
1869 (filename, os_dir, _ErrnoOrStr(err)))
1871 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1872 return False, ("File '%s' under path '%s' is not a regular file" %
1875 if filename in constants.OS_SCRIPTS:
1876 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1877 return False, ("File '%s' under path '%s' is not executable" %
1881 if constants.OS_VARIANTS_FILE in os_files:
1882 variants_file = os_files[constants.OS_VARIANTS_FILE]
1884 variants = utils.ReadFile(variants_file).splitlines()
1885 except EnvironmentError, err:
1886 return False, ("Error while reading the OS variants file at %s: %s" %
1887 (variants_file, _ErrnoOrStr(err)))
1889 return False, ("No supported os variant found")
1892 if constants.OS_PARAMETERS_FILE in os_files:
1893 parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1895 parameters = utils.ReadFile(parameters_file).splitlines()
1896 except EnvironmentError, err:
1897 return False, ("Error while reading the OS parameters file at %s: %s" %
1898 (parameters_file, _ErrnoOrStr(err)))
1899 parameters = [v.split(None, 1) for v in parameters]
1901 os_obj = objects.OS(name=name, path=os_dir,
1902 create_script=os_files[constants.OS_SCRIPT_CREATE],
1903 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1904 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1905 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1906 verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1908 supported_variants=variants,
1909 supported_parameters=parameters,
1910 api_versions=api_versions)
1914 def OSFromDisk(name, base_dir=None):
1915 """Create an OS instance from disk.
1917 This function will return an OS instance if the given name is a
1918 valid OS name. Otherwise, it will raise an appropriate
1919 L{RPCFail} exception, detailing why this is not a valid OS.
1921 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1922 an exception but returns true/false status data.
1924 @type base_dir: string
1925 @keyword base_dir: Base directory containing OS installations.
1926 Defaults to a search in all the OS_SEARCH_PATH dirs.
1927 @rtype: L{objects.OS}
1928 @return: the OS instance if we find a valid one
1929 @raise RPCFail: if we don't find a valid OS
1932 name_only = name.split("+", 1)[0]
1933 status, payload = _TryOSFromDisk(name_only, base_dir)
1941 def OSCoreEnv(inst_os, os_params, debug=0):
1942 """Calculate the basic environment for an os script.
1944 @type inst_os: L{objects.OS}
1945 @param inst_os: operating system for which the environment is being built
1946 @type os_params: dict
1947 @param os_params: the OS parameters
1948 @type debug: integer
1949 @param debug: debug level (0 or 1, for OS Api 10)
1951 @return: dict of environment variables
1952 @raise errors.BlockDeviceError: if the block device
1958 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1959 result['OS_API_VERSION'] = '%d' % api_version
1960 result['OS_NAME'] = inst_os.name
1961 result['DEBUG_LEVEL'] = '%d' % debug
1964 if api_version >= constants.OS_API_V15:
1966 variant = inst_os.name.split('+', 1)[1]
1968 variant = inst_os.supported_variants[0]
1969 result['OS_VARIANT'] = variant
1972 for pname, pvalue in os_params.items():
1973 result['OSP_%s' % pname.upper()] = pvalue
1978 def OSEnvironment(instance, inst_os, debug=0):
1979 """Calculate the environment for an os script.
1981 @type instance: L{objects.Instance}
1982 @param instance: target instance for the os script run
1983 @type inst_os: L{objects.OS}
1984 @param inst_os: operating system for which the environment is being built
1985 @type debug: integer
1986 @param debug: debug level (0 or 1, for OS Api 10)
1988 @return: dict of environment variables
1989 @raise errors.BlockDeviceError: if the block device
1993 result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
1995 result['INSTANCE_NAME'] = instance.name
1996 result['INSTANCE_OS'] = instance.os
1997 result['HYPERVISOR'] = instance.hypervisor
1998 result['DISK_COUNT'] = '%d' % len(instance.disks)
1999 result['NIC_COUNT'] = '%d' % len(instance.nics)
2002 for idx, disk in enumerate(instance.disks):
2003 real_disk = _OpenRealBD(disk)
2004 result['DISK_%d_PATH' % idx] = real_disk.dev_path
2005 result['DISK_%d_ACCESS' % idx] = disk.mode
2006 if constants.HV_DISK_TYPE in instance.hvparams:
2007 result['DISK_%d_FRONTEND_TYPE' % idx] = \
2008 instance.hvparams[constants.HV_DISK_TYPE]
2009 if disk.dev_type in constants.LDS_BLOCK:
2010 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2011 elif disk.dev_type == constants.LD_FILE:
2012 result['DISK_%d_BACKEND_TYPE' % idx] = \
2013 'file:%s' % disk.physical_id[0]
2016 for idx, nic in enumerate(instance.nics):
2017 result['NIC_%d_MAC' % idx] = nic.mac
2019 result['NIC_%d_IP' % idx] = nic.ip
2020 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2021 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2022 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2023 if nic.nicparams[constants.NIC_LINK]:
2024 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2025 if constants.HV_NIC_TYPE in instance.hvparams:
2026 result['NIC_%d_FRONTEND_TYPE' % idx] = \
2027 instance.hvparams[constants.HV_NIC_TYPE]
2030 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2031 for key, value in source.items():
2032 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2037 def BlockdevGrow(disk, amount):
2038 """Grow a stack of block devices.
2040 This function is called recursively, with the childrens being the
2041 first ones to resize.
2043 @type disk: L{objects.Disk}
2044 @param disk: the disk to be grown
2045 @rtype: (status, result)
2046 @return: a tuple with the status of the operation
2047 (True/False), and the errors message if status
2051 r_dev = _RecursiveFindBD(disk)
2053 _Fail("Cannot find block device %s", disk)
2057 except errors.BlockDeviceError, err:
2058 _Fail("Failed to grow block device: %s", err, exc=True)
2061 def BlockdevSnapshot(disk):
2062 """Create a snapshot copy of a block device.
2064 This function is called recursively, and the snapshot is actually created
2065 just for the leaf lvm backend device.
2067 @type disk: L{objects.Disk}
2068 @param disk: the disk to be snapshotted
2070 @return: snapshot disk path
2073 if disk.dev_type == constants.LD_DRBD8:
2074 if not disk.children:
2075 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2077 return BlockdevSnapshot(disk.children[0])
2078 elif disk.dev_type == constants.LD_LV:
2079 r_dev = _RecursiveFindBD(disk)
2080 if r_dev is not None:
2081 # FIXME: choose a saner value for the snapshot size
2082 # let's stay on the safe side and ask for the full size, for now
2083 return r_dev.Snapshot(disk.size)
2085 _Fail("Cannot find block device %s", disk)
2087 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2088 disk.unique_id, disk.dev_type)
2091 def FinalizeExport(instance, snap_disks):
2092 """Write out the export configuration information.
2094 @type instance: L{objects.Instance}
2095 @param instance: the instance which we export, used for
2096 saving configuration
2097 @type snap_disks: list of L{objects.Disk}
2098 @param snap_disks: list of snapshot block devices, which
2099 will be used to get the actual name of the dump file
2104 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2105 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2107 config = objects.SerializableConfigParser()
2109 config.add_section(constants.INISECT_EXP)
2110 config.set(constants.INISECT_EXP, 'version', '0')
2111 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2112 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2113 config.set(constants.INISECT_EXP, 'os', instance.os)
2114 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2116 config.add_section(constants.INISECT_INS)
2117 config.set(constants.INISECT_INS, 'name', instance.name)
2118 config.set(constants.INISECT_INS, 'memory', '%d' %
2119 instance.beparams[constants.BE_MEMORY])
2120 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2121 instance.beparams[constants.BE_VCPUS])
2122 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2123 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2126 for nic_count, nic in enumerate(instance.nics):
2128 config.set(constants.INISECT_INS, 'nic%d_mac' %
2129 nic_count, '%s' % nic.mac)
2130 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2131 for param in constants.NICS_PARAMETER_TYPES:
2132 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2133 '%s' % nic.nicparams.get(param, None))
2134 # TODO: redundant: on load can read nics until it doesn't exist
2135 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2138 for disk_count, disk in enumerate(snap_disks):
2141 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2142 ('%s' % disk.iv_name))
2143 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2144 ('%s' % disk.physical_id[1]))
2145 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2148 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2150 # New-style hypervisor/backend parameters
2152 config.add_section(constants.INISECT_HYP)
2153 for name, value in instance.hvparams.items():
2154 if name not in constants.HVC_GLOBALS:
2155 config.set(constants.INISECT_HYP, name, str(value))
2157 config.add_section(constants.INISECT_BEP)
2158 for name, value in instance.beparams.items():
2159 config.set(constants.INISECT_BEP, name, str(value))
2161 config.add_section(constants.INISECT_OSP)
2162 for name, value in instance.osparams.items():
2163 config.set(constants.INISECT_OSP, name, str(value))
2165 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2166 data=config.Dumps())
2167 shutil.rmtree(finaldestdir, ignore_errors=True)
2168 shutil.move(destdir, finaldestdir)
2171 def ExportInfo(dest):
2172 """Get export configuration information.
2175 @param dest: directory containing the export
2177 @rtype: L{objects.SerializableConfigParser}
2178 @return: a serializable config file containing the
2182 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2184 config = objects.SerializableConfigParser()
2187 if (not config.has_section(constants.INISECT_EXP) or
2188 not config.has_section(constants.INISECT_INS)):
2189 _Fail("Export info file doesn't have the required fields")
2191 return config.Dumps()
2195 """Return a list of exports currently available on this machine.
2198 @return: list of the exports
2201 if os.path.isdir(constants.EXPORT_DIR):
2202 return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2204 _Fail("No exports directory")
2207 def RemoveExport(export):
2208 """Remove an existing export from the node.
2211 @param export: the name of the export to remove
2215 target = utils.PathJoin(constants.EXPORT_DIR, export)
2218 shutil.rmtree(target)
2219 except EnvironmentError, err:
2220 _Fail("Error while removing the export: %s", err, exc=True)
2223 def BlockdevRename(devlist):
2224 """Rename a list of block devices.
2226 @type devlist: list of tuples
2227 @param devlist: list of tuples of the form (disk,
2228 new_logical_id, new_physical_id); disk is an
2229 L{objects.Disk} object describing the current disk,
2230 and new logical_id/physical_id is the name we
2233 @return: True if all renames succeeded, False otherwise
2238 for disk, unique_id in devlist:
2239 dev = _RecursiveFindBD(disk)
2241 msgs.append("Can't find device %s in rename" % str(disk))
2245 old_rpath = dev.dev_path
2246 dev.Rename(unique_id)
2247 new_rpath = dev.dev_path
2248 if old_rpath != new_rpath:
2249 DevCacheManager.RemoveCache(old_rpath)
2250 # FIXME: we should add the new cache information here, like:
2251 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2252 # but we don't have the owner here - maybe parse from existing
2253 # cache? for now, we only lose lvm data when we rename, which
2254 # is less critical than DRBD or MD
2255 except errors.BlockDeviceError, err:
2256 msgs.append("Can't rename device '%s' to '%s': %s" %
2257 (dev, unique_id, err))
2258 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2261 _Fail("; ".join(msgs))
2264 def _TransformFileStorageDir(file_storage_dir):
2265 """Checks whether given file_storage_dir is valid.
2267 Checks wheter the given file_storage_dir is within the cluster-wide
2268 default file_storage_dir stored in SimpleStore. Only paths under that
2269 directory are allowed.
2271 @type file_storage_dir: str
2272 @param file_storage_dir: the path to check
2274 @return: the normalized path if valid, None otherwise
2277 if not constants.ENABLE_FILE_STORAGE:
2278 _Fail("File storage disabled at configure time")
2280 file_storage_dir = os.path.normpath(file_storage_dir)
2281 base_file_storage_dir = cfg.GetFileStorageDir()
2282 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2283 base_file_storage_dir):
2284 _Fail("File storage directory '%s' is not under base file"
2285 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2286 return file_storage_dir
2289 def CreateFileStorageDir(file_storage_dir):
2290 """Create file storage directory.
2292 @type file_storage_dir: str
2293 @param file_storage_dir: directory to create
2296 @return: tuple with first element a boolean indicating wheter dir
2297 creation was successful or not
2300 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2301 if os.path.exists(file_storage_dir):
2302 if not os.path.isdir(file_storage_dir):
2303 _Fail("Specified storage dir '%s' is not a directory",
2307 os.makedirs(file_storage_dir, 0750)
2308 except OSError, err:
2309 _Fail("Cannot create file storage directory '%s': %s",
2310 file_storage_dir, err, exc=True)
2313 def RemoveFileStorageDir(file_storage_dir):
2314 """Remove file storage directory.
2316 Remove it only if it's empty. If not log an error and return.
2318 @type file_storage_dir: str
2319 @param file_storage_dir: the directory we should cleanup
2320 @rtype: tuple (success,)
2321 @return: tuple of one element, C{success}, denoting
2322 whether the operation was successful
2325 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2326 if os.path.exists(file_storage_dir):
2327 if not os.path.isdir(file_storage_dir):
2328 _Fail("Specified Storage directory '%s' is not a directory",
2330 # deletes dir only if empty, otherwise we want to fail the rpc call
2332 os.rmdir(file_storage_dir)
2333 except OSError, err:
2334 _Fail("Cannot remove file storage directory '%s': %s",
2335 file_storage_dir, err)
2338 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2339 """Rename the file storage directory.
2341 @type old_file_storage_dir: str
2342 @param old_file_storage_dir: the current path
2343 @type new_file_storage_dir: str
2344 @param new_file_storage_dir: the name we should rename to
2345 @rtype: tuple (success,)
2346 @return: tuple of one element, C{success}, denoting
2347 whether the operation was successful
2350 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2351 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2352 if not os.path.exists(new_file_storage_dir):
2353 if os.path.isdir(old_file_storage_dir):
2355 os.rename(old_file_storage_dir, new_file_storage_dir)
2356 except OSError, err:
2357 _Fail("Cannot rename '%s' to '%s': %s",
2358 old_file_storage_dir, new_file_storage_dir, err)
2360 _Fail("Specified storage dir '%s' is not a directory",
2361 old_file_storage_dir)
2363 if os.path.exists(old_file_storage_dir):
2364 _Fail("Cannot rename '%s' to '%s': both locations exist",
2365 old_file_storage_dir, new_file_storage_dir)
2368 def _EnsureJobQueueFile(file_name):
2369 """Checks whether the given filename is in the queue directory.
2371 @type file_name: str
2372 @param file_name: the file name we should check
2374 @raises RPCFail: if the file is not valid
2377 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2378 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2381 _Fail("Passed job queue file '%s' does not belong to"
2382 " the queue directory '%s'", file_name, queue_dir)
2385 def JobQueueUpdate(file_name, content):
2386 """Updates a file in the queue directory.
2388 This is just a wrapper over L{utils.WriteFile}, with proper
2391 @type file_name: str
2392 @param file_name: the job file name
2394 @param content: the new job contents
2396 @return: the success of the operation
2399 _EnsureJobQueueFile(file_name)
2401 # Write and replace the file atomically
2402 utils.WriteFile(file_name, data=_Decompress(content))
2405 def JobQueueRename(old, new):
2406 """Renames a job queue file.
2408 This is just a wrapper over os.rename with proper checking.
2411 @param old: the old (actual) file name
2413 @param new: the desired file name
2415 @return: the success of the operation and payload
2418 _EnsureJobQueueFile(old)
2419 _EnsureJobQueueFile(new)
2421 utils.RenameFile(old, new, mkdir=True)
2424 def BlockdevClose(instance_name, disks):
2425 """Closes the given block devices.
2427 This means they will be switched to secondary mode (in case of
2430 @param instance_name: if the argument is not empty, the symlinks
2431 of this instance will be removed
2432 @type disks: list of L{objects.Disk}
2433 @param disks: the list of disks to be closed
2434 @rtype: tuple (success, message)
2435 @return: a tuple of success and message, where success
2436 indicates the succes of the operation, and message
2437 which will contain the error details in case we
2443 rd = _RecursiveFindBD(cf)
2445 _Fail("Can't find device %s", cf)
2452 except errors.BlockDeviceError, err:
2453 msg.append(str(err))
2455 _Fail("Can't make devices secondary: %s", ",".join(msg))
2458 _RemoveBlockDevLinks(instance_name, disks)
2461 def ValidateHVParams(hvname, hvparams):
2462 """Validates the given hypervisor parameters.
2464 @type hvname: string
2465 @param hvname: the hypervisor name
2466 @type hvparams: dict
2467 @param hvparams: the hypervisor parameters to be validated
2472 hv_type = hypervisor.GetHypervisor(hvname)
2473 hv_type.ValidateParameters(hvparams)
2474 except errors.HypervisorError, err:
2475 _Fail(str(err), log=False)
2478 def _CheckOSPList(os_obj, parameters):
2479 """Check whether a list of parameters is supported by the OS.
2481 @type os_obj: L{objects.OS}
2482 @param os_obj: OS object to check
2483 @type parameters: list
2484 @param parameters: the list of parameters to check
2487 supported = [v[0] for v in os_obj.supported_parameters]
2488 delta = frozenset(parameters).difference(supported)
2490 _Fail("The following parameters are not supported"
2491 " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2494 def ValidateOS(required, osname, checks, osparams):
2495 """Validate the given OS' parameters.
2497 @type required: boolean
2498 @param required: whether absence of the OS should translate into
2500 @type osname: string
2501 @param osname: the OS to be validated
2503 @param checks: list of the checks to run (currently only 'parameters')
2504 @type osparams: dict
2505 @param osparams: dictionary with OS parameters
2507 @return: True if the validation passed, or False if the OS was not
2508 found and L{required} was false
2511 if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2512 _Fail("Unknown checks required for OS %s: %s", osname,
2513 set(checks).difference(constants.OS_VALIDATE_CALLS))
2515 name_only = osname.split("+", 1)[0]
2516 status, tbv = _TryOSFromDisk(name_only, None)
2524 if max(tbv.api_versions) < constants.OS_API_V20:
2527 if constants.OS_VALIDATE_PARAMETERS in checks:
2528 _CheckOSPList(tbv, osparams.keys())
2530 validate_env = OSCoreEnv(tbv, osparams)
2531 result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2534 logging.error("os validate command '%s' returned error: %s output: %s",
2535 result.cmd, result.fail_reason, result.output)
2536 _Fail("OS validation script failed (%s), output: %s",
2537 result.fail_reason, result.output, log=False)
2543 """Demotes the current node from master candidate role.
2546 # try to ensure we're not the master by mistake
2547 master, myself = ssconf.GetMasterAndMyself()
2548 if master == myself:
2549 _Fail("ssconf status shows I'm the master node, will not demote")
2551 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2552 if not result.failed:
2553 _Fail("The master daemon is running, will not demote")
2556 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2557 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2558 except EnvironmentError, err:
2559 if err.errno != errno.ENOENT:
2560 _Fail("Error while backing up cluster file: %s", err, exc=True)
2562 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2565 def _GetX509Filenames(cryptodir, name):
2566 """Returns the full paths for the private key and certificate.
2569 return (utils.PathJoin(cryptodir, name),
2570 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2571 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2574 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2575 """Creates a new X509 certificate for SSL/TLS.
2578 @param validity: Validity in seconds
2579 @rtype: tuple; (string, string)
2580 @return: Certificate name and public part
2583 (key_pem, cert_pem) = \
2584 utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2585 min(validity, _MAX_SSL_CERT_VALIDITY))
2587 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2588 prefix="x509-%s-" % utils.TimestampForFilename())
2590 name = os.path.basename(cert_dir)
2591 assert len(name) > 5
2593 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2595 utils.WriteFile(key_file, mode=0400, data=key_pem)
2596 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2598 # Never return private key as it shouldn't leave the node
2599 return (name, cert_pem)
2601 shutil.rmtree(cert_dir, ignore_errors=True)
2605 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2606 """Removes a X509 certificate.
2609 @param name: Certificate name
2612 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2614 utils.RemoveFile(key_file)
2615 utils.RemoveFile(cert_file)
2619 except EnvironmentError, err:
2620 _Fail("Cannot remove certificate directory '%s': %s",
2624 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2625 """Returns the command for the requested input/output.
2627 @type instance: L{objects.Instance}
2628 @param instance: The instance object
2629 @param mode: Import/export mode
2630 @param ieio: Input/output type
2631 @param ieargs: Input/output arguments
2634 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2641 if ieio == constants.IEIO_FILE:
2642 (filename, ) = ieargs
2644 if not utils.IsNormAbsPath(filename):
2645 _Fail("Path '%s' is not normalized or absolute", filename)
2647 directory = os.path.normpath(os.path.dirname(filename))
2649 if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2650 constants.EXPORT_DIR):
2651 _Fail("File '%s' is not under exports directory '%s'",
2652 filename, constants.EXPORT_DIR)
2655 utils.Makedirs(directory, mode=0750)
2657 quoted_filename = utils.ShellQuote(filename)
2659 if mode == constants.IEM_IMPORT:
2660 suffix = "> %s" % quoted_filename
2661 elif mode == constants.IEM_EXPORT:
2662 suffix = "< %s" % quoted_filename
2664 # Retrieve file size
2666 st = os.stat(filename)
2667 except EnvironmentError, err:
2668 logging.error("Can't stat(2) %s: %s", filename, err)
2670 exp_size = utils.BytesToMebibyte(st.st_size)
2672 elif ieio == constants.IEIO_RAW_DISK:
2675 real_disk = _OpenRealBD(disk)
2677 if mode == constants.IEM_IMPORT:
2678 # we set here a smaller block size as, due to transport buffering, more
2679 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2680 # is not already there or we pass a wrong path; we use notrunc to no
2681 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2682 # much memory; this means that at best, we flush every 64k, which will
2684 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2685 " bs=%s oflag=dsync"),
2689 elif mode == constants.IEM_EXPORT:
2690 # the block size on the read dd is 1MiB to match our units
2691 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2693 str(1024 * 1024), # 1 MB
2695 exp_size = disk.size
2697 elif ieio == constants.IEIO_SCRIPT:
2698 (disk, disk_index, ) = ieargs
2700 assert isinstance(disk_index, (int, long))
2702 real_disk = _OpenRealBD(disk)
2704 inst_os = OSFromDisk(instance.os)
2705 env = OSEnvironment(instance, inst_os)
2707 if mode == constants.IEM_IMPORT:
2708 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2709 env["IMPORT_INDEX"] = str(disk_index)
2710 script = inst_os.import_script
2712 elif mode == constants.IEM_EXPORT:
2713 env["EXPORT_DEVICE"] = real_disk.dev_path
2714 env["EXPORT_INDEX"] = str(disk_index)
2715 script = inst_os.export_script
2717 # TODO: Pass special environment only to script
2718 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2720 if mode == constants.IEM_IMPORT:
2721 suffix = "| %s" % script_cmd
2723 elif mode == constants.IEM_EXPORT:
2724 prefix = "%s |" % script_cmd
2726 # Let script predict size
2727 exp_size = constants.IE_CUSTOM_SIZE
2730 _Fail("Invalid %s I/O mode %r", mode, ieio)
2732 return (env, prefix, suffix, exp_size)
2735 def _CreateImportExportStatusDir(prefix):
2736 """Creates status directory for import/export.
2739 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2741 (prefix, utils.TimestampForFilename())))
2744 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2745 """Starts an import or export daemon.
2747 @param mode: Import/output mode
2748 @type opts: L{objects.ImportExportOptions}
2749 @param opts: Daemon options
2751 @param host: Remote host for export (None for import)
2753 @param port: Remote port for export (None for import)
2754 @type instance: L{objects.Instance}
2755 @param instance: Instance object
2756 @param ieio: Input/output type
2757 @param ieioargs: Input/output arguments
2760 if mode == constants.IEM_IMPORT:
2763 if not (host is None and port is None):
2764 _Fail("Can not specify host or port on import")
2766 elif mode == constants.IEM_EXPORT:
2769 if host is None or port is None:
2770 _Fail("Host and port must be specified for an export")
2773 _Fail("Invalid mode %r", mode)
2775 if (opts.key_name is None) ^ (opts.ca_pem is None):
2776 _Fail("Cluster certificate can only be used for both key and CA")
2778 (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2779 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2781 if opts.key_name is None:
2783 key_path = constants.NODED_CERT_FILE
2784 cert_path = constants.NODED_CERT_FILE
2785 assert opts.ca_pem is None
2787 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2789 assert opts.ca_pem is not None
2791 for i in [key_path, cert_path]:
2792 if not os.path.exists(i):
2793 _Fail("File '%s' does not exist" % i)
2795 status_dir = _CreateImportExportStatusDir(prefix)
2797 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2798 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2799 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2801 if opts.ca_pem is None:
2803 ca = utils.ReadFile(constants.NODED_CERT_FILE)
2808 utils.WriteFile(ca_file, data=ca, mode=0400)
2811 constants.IMPORT_EXPORT_DAEMON,
2813 "--key=%s" % key_path,
2814 "--cert=%s" % cert_path,
2815 "--ca=%s" % ca_file,
2819 cmd.append("--host=%s" % host)
2822 cmd.append("--port=%s" % port)
2825 cmd.append("--compress=%s" % opts.compress)
2828 cmd.append("--magic=%s" % opts.magic)
2830 if exp_size is not None:
2831 cmd.append("--expected-size=%s" % exp_size)
2834 cmd.append("--cmd-prefix=%s" % cmd_prefix)
2837 cmd.append("--cmd-suffix=%s" % cmd_suffix)
2839 logfile = _InstanceLogName(prefix, instance.os, instance.name)
2841 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2842 # support for receiving a file descriptor for output
2843 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2846 # The import/export name is simply the status directory name
2847 return os.path.basename(status_dir)
2850 shutil.rmtree(status_dir, ignore_errors=True)
2854 def GetImportExportStatus(names):
2855 """Returns import/export daemon status.
2857 @type names: sequence
2858 @param names: List of names
2859 @rtype: List of dicts
2860 @return: Returns a list of the state of each named import/export or None if a
2861 status couldn't be read
2867 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2871 data = utils.ReadFile(status_file)
2872 except EnvironmentError, err:
2873 if err.errno != errno.ENOENT:
2881 result.append(serializer.LoadJson(data))
2886 def AbortImportExport(name):
2887 """Sends SIGTERM to a running import/export daemon.
2890 logging.info("Abort import/export %s", name)
2892 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2893 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2896 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2898 utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2901 def CleanupImportExport(name):
2902 """Cleanup after an import or export.
2904 If the import/export daemon is still running it's killed. Afterwards the
2905 whole status directory is removed.
2908 logging.info("Finalizing import/export %s", name)
2910 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2912 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2915 logging.info("Import/export %s is still running with PID %s",
2917 utils.KillProcess(pid, waitpid=False)
2919 shutil.rmtree(status_dir, ignore_errors=True)
2922 def _FindDisks(nodes_ip, disks):
2923 """Sets the physical ID on disks and returns the block devices.
2926 # set the correct physical ID
2927 my_name = utils.HostInfo().name
2929 cf.SetPhysicalID(my_name, nodes_ip)
2934 rd = _RecursiveFindBD(cf)
2936 _Fail("Can't find device %s", cf)
2941 def DrbdDisconnectNet(nodes_ip, disks):
2942 """Disconnects the network on a list of drbd devices.
2945 bdevs = _FindDisks(nodes_ip, disks)
2951 except errors.BlockDeviceError, err:
2952 _Fail("Can't change network configuration to standalone mode: %s",
2956 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2957 """Attaches the network on a list of drbd devices.
2960 bdevs = _FindDisks(nodes_ip, disks)
2963 for idx, rd in enumerate(bdevs):
2965 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2966 except EnvironmentError, err:
2967 _Fail("Can't create symlink: %s", err)
2968 # reconnect disks, switch to new master configuration and if
2969 # needed primary mode
2972 rd.AttachNet(multimaster)
2973 except errors.BlockDeviceError, err:
2974 _Fail("Can't change network configuration: %s", err)
2976 # wait until the disks are connected; we need to retry the re-attach
2977 # if the device becomes standalone, as this might happen if the one
2978 # node disconnects and reconnects in a different mode before the
2979 # other node reconnects; in this case, one or both of the nodes will
2980 # decide it has wrong configuration and switch to standalone
2983 all_connected = True
2986 stats = rd.GetProcStatus()
2988 all_connected = (all_connected and
2989 (stats.is_connected or stats.is_in_resync))
2991 if stats.is_standalone:
2992 # peer had different config info and this node became
2993 # standalone, even though this should not happen with the
2994 # new staged way of changing disk configs
2996 rd.AttachNet(multimaster)
2997 except errors.BlockDeviceError, err:
2998 _Fail("Can't change network configuration: %s", err)
3000 if not all_connected:
3001 raise utils.RetryAgain()
3004 # Start with a delay of 100 miliseconds and go up to 5 seconds
3005 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3006 except utils.RetryTimeout:
3007 _Fail("Timeout in disk reconnecting")
3010 # change to primary mode
3014 except errors.BlockDeviceError, err:
3015 _Fail("Can't change to primary mode: %s", err)
3018 def DrbdWaitSync(nodes_ip, disks):
3019 """Wait until DRBDs have synchronized.
3023 stats = rd.GetProcStatus()
3024 if not (stats.is_connected or stats.is_in_resync):
3025 raise utils.RetryAgain()
3028 bdevs = _FindDisks(nodes_ip, disks)
3034 # poll each second for 15 seconds
3035 stats = utils.Retry(_helper, 1, 15, args=[rd])
3036 except utils.RetryTimeout:
3037 stats = rd.GetProcStatus()
3039 if not (stats.is_connected or stats.is_in_resync):
3040 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3041 alldone = alldone and (not stats.is_in_resync)
3042 if stats.sync_percent is not None:
3043 min_resync = min(min_resync, stats.sync_percent)
3045 return (alldone, min_resync)
3048 def PowercycleNode(hypervisor_type):
3049 """Hard-powercycle the node.
3051 Because we need to return first, and schedule the powercycle in the
3052 background, we won't be able to report failures nicely.
3055 hyper = hypervisor.GetHypervisor(hypervisor_type)
3059 # if we can't fork, we'll pretend that we're in the child process
3062 return "Reboot scheduled in 5 seconds"
3063 # ensure the child is running on ram
3066 except Exception: # pylint: disable-msg=W0703
3069 hyper.PowercycleNode()
3072 class HooksRunner(object):
3075 This class is instantiated on the node side (ganeti-noded) and not
3079 def __init__(self, hooks_base_dir=None):
3080 """Constructor for hooks runner.
3082 @type hooks_base_dir: str or None
3083 @param hooks_base_dir: if not None, this overrides the
3084 L{constants.HOOKS_BASE_DIR} (useful for unittests)
3087 if hooks_base_dir is None:
3088 hooks_base_dir = constants.HOOKS_BASE_DIR
3089 # yeah, _BASE_DIR is not valid for attributes, we use it like a
3091 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3093 def RunHooks(self, hpath, phase, env):
3094 """Run the scripts in the hooks directory.
3097 @param hpath: the path to the hooks directory which
3100 @param phase: either L{constants.HOOKS_PHASE_PRE} or
3101 L{constants.HOOKS_PHASE_POST}
3103 @param env: dictionary with the environment for the hook
3105 @return: list of 3-element tuples:
3107 - script result, either L{constants.HKR_SUCCESS} or
3108 L{constants.HKR_FAIL}
3109 - output of the script
3111 @raise errors.ProgrammerError: for invalid input
3115 if phase == constants.HOOKS_PHASE_PRE:
3117 elif phase == constants.HOOKS_PHASE_POST:
3120 _Fail("Unknown hooks phase '%s'", phase)
3123 subdir = "%s-%s.d" % (hpath, suffix)
3124 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3128 if not os.path.isdir(dir_name):
3129 # for non-existing/non-dirs, we simply exit instead of logging a
3130 # warning at every operation
3133 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3135 for (relname, relstatus, runresult) in runparts_results:
3136 if relstatus == constants.RUNPARTS_SKIP:
3137 rrval = constants.HKR_SKIP
3139 elif relstatus == constants.RUNPARTS_ERR:
3140 rrval = constants.HKR_FAIL
3141 output = "Hook script execution error: %s" % runresult
3142 elif relstatus == constants.RUNPARTS_RUN:
3143 if runresult.failed:
3144 rrval = constants.HKR_FAIL
3146 rrval = constants.HKR_SUCCESS
3147 output = utils.SafeEncode(runresult.output.strip())
3148 results.append(("%s/%s" % (subdir, relname), rrval, output))
3153 class IAllocatorRunner(object):
3154 """IAllocator runner.
3156 This class is instantiated on the node side (ganeti-noded) and not on
3161 def Run(name, idata):
3162 """Run an iallocator script.
3165 @param name: the iallocator script name
3167 @param idata: the allocator input data
3170 @return: two element tuple of:
3172 - either error message or stdout of allocator (for success)
3175 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3177 if alloc_script is None:
3178 _Fail("iallocator module '%s' not found in the search path", name)
3180 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3184 result = utils.RunCmd([alloc_script, fin_name])
3186 _Fail("iallocator module '%s' failed: %s, output '%s'",
3187 name, result.fail_reason, result.output)
3191 return result.stdout
3194 class DevCacheManager(object):
3195 """Simple class for managing a cache of block device information.
3198 _DEV_PREFIX = "/dev/"
3199 _ROOT_DIR = constants.BDEV_CACHE_DIR
3202 def _ConvertPath(cls, dev_path):
3203 """Converts a /dev/name path to the cache file name.
3205 This replaces slashes with underscores and strips the /dev
3206 prefix. It then returns the full path to the cache file.
3209 @param dev_path: the C{/dev/} path name
3211 @return: the converted path name
3214 if dev_path.startswith(cls._DEV_PREFIX):
3215 dev_path = dev_path[len(cls._DEV_PREFIX):]
3216 dev_path = dev_path.replace("/", "_")
3217 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3221 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3222 """Updates the cache information for a given device.
3225 @param dev_path: the pathname of the device
3227 @param owner: the owner (instance name) of the device
3228 @type on_primary: bool
3229 @param on_primary: whether this is the primary
3232 @param iv_name: the instance-visible name of the
3233 device, as in objects.Disk.iv_name
3238 if dev_path is None:
3239 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3241 fpath = cls._ConvertPath(dev_path)
3247 iv_name = "not_visible"
3248 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3250 utils.WriteFile(fpath, data=fdata)
3251 except EnvironmentError, err:
3252 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3255 def RemoveCache(cls, dev_path):
3256 """Remove data for a dev_path.
3258 This is just a wrapper over L{utils.RemoveFile} with a converted
3259 path name and logging.
3262 @param dev_path: the pathname of the device
3267 if dev_path is None:
3268 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3270 fpath = cls._ConvertPath(dev_path)
3272 utils.RemoveFile(fpath)
3273 except EnvironmentError, err:
3274 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)