4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
51 from ganeti import errors
52 from ganeti import utils
53 from ganeti import ssh
54 from ganeti import hypervisor
55 from ganeti import constants
56 from ganeti import bdev
57 from ganeti import objects
58 from ganeti import ssconf
61 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62 _ALLOWED_CLEAN_DIRS = frozenset([
64 constants.JOB_QUEUE_ARCHIVE_DIR,
69 class RPCFail(Exception):
70 """Class denoting RPC failure.
72 Its argument is the error message.
77 def _Fail(msg, *args, **kwargs):
78 """Log an error and the raise an RPCFail exception.
80 This exception is then handled specially in the ganeti daemon and
81 turned into a 'failed' return type. As such, this function is a
82 useful shortcut for logging the error and returning it to the master
86 @param msg: the text of the exception
92 if "log" not in kwargs or kwargs["log"]: # if we should log this error
93 if "exc" in kwargs and kwargs["exc"]:
94 logging.exception(msg)
101 """Simple wrapper to return a SimpleStore.
103 @rtype: L{ssconf.SimpleStore}
104 @return: a SimpleStore instance
107 return ssconf.SimpleStore()
110 def _GetSshRunner(cluster_name):
111 """Simple wrapper to return an SshRunner.
113 @type cluster_name: str
114 @param cluster_name: the cluster name, which is needed
115 by the SshRunner constructor
116 @rtype: L{ssh.SshRunner}
117 @return: an SshRunner instance
120 return ssh.SshRunner(cluster_name)
123 def _Decompress(data):
124 """Unpacks data compressed by the RPC client.
126 @type data: list or tuple
127 @param data: Data sent by RPC client
129 @return: Decompressed data
132 assert isinstance(data, (list, tuple))
133 assert len(data) == 2
134 (encoding, content) = data
135 if encoding == constants.RPC_ENCODING_NONE:
137 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
138 return zlib.decompress(base64.b64decode(content))
140 raise AssertionError("Unknown data encoding")
143 def _CleanDirectory(path, exclude=None):
144 """Removes all regular files in a directory.
147 @param path: the directory to clean
149 @param exclude: list of files to be excluded, defaults
153 if path not in _ALLOWED_CLEAN_DIRS:
154 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
157 if not os.path.isdir(path):
162 # Normalize excluded paths
163 exclude = [os.path.normpath(i) for i in exclude]
165 for rel_name in utils.ListVisibleFiles(path):
166 full_name = utils.PathJoin(path, rel_name)
167 if full_name in exclude:
169 if os.path.isfile(full_name) and not os.path.islink(full_name):
170 utils.RemoveFile(full_name)
173 def _BuildUploadFileList():
174 """Build the list of allowed upload files.
176 This is abstracted so that it's built only once at module import time.
179 allowed_files = set([
180 constants.CLUSTER_CONF_FILE,
182 constants.SSH_KNOWN_HOSTS_FILE,
183 constants.VNC_PASSWORD_FILE,
184 constants.RAPI_CERT_FILE,
185 constants.RAPI_USERS_FILE,
186 constants.CONFD_HMAC_KEY,
189 for hv_name in constants.HYPER_TYPES:
190 hv_class = hypervisor.GetHypervisorClass(hv_name)
191 allowed_files.update(hv_class.GetAncillaryFiles())
193 return frozenset(allowed_files)
196 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
200 """Removes job queue files and archived jobs.
206 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
207 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
211 """Returns master information.
213 This is an utility function to compute master information, either
214 for consumption here or from the node daemon.
217 @return: master_netdev, master_ip, master_name
218 @raise RPCFail: in case of errors
223 master_netdev = cfg.GetMasterNetdev()
224 master_ip = cfg.GetMasterIP()
225 master_node = cfg.GetMasterNode()
226 except errors.ConfigurationError, err:
227 _Fail("Cluster configuration incomplete: %s", err, exc=True)
228 return (master_netdev, master_ip, master_node)
231 def StartMaster(start_daemons, no_voting):
232 """Activate local node as master node.
234 The function will always try activate the IP address of the master
235 (unless someone else has it). It will also start the master daemons,
236 based on the start_daemons parameter.
238 @type start_daemons: boolean
239 @param start_daemons: whether to also start the master
240 daemons (ganeti-masterd and ganeti-rapi)
241 @type no_voting: boolean
242 @param no_voting: whether to start ganeti-masterd without a node vote
243 (if start_daemons is True), but still non-interactively
247 # GetMasterInfo will raise an exception if not able to return data
248 master_netdev, master_ip, _ = GetMasterInfo()
251 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
252 if utils.OwnIpAddress(master_ip):
253 # we already have the ip:
254 logging.debug("Master IP already configured, doing nothing")
256 msg = "Someone else has the master ip, not activating"
260 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
261 "dev", master_netdev, "label",
262 "%s:0" % master_netdev])
264 msg = "Can't activate master IP: %s" % result.output
268 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
269 "-s", master_ip, master_ip])
270 # we'll ignore the exit code of arping
272 # and now start the master and rapi daemons
275 masterd_args = "--no-voting --yes-do-it"
280 "EXTRA_MASTERD_ARGS": masterd_args,
283 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
285 msg = "Can't start Ganeti master: %s" % result.output
290 _Fail("; ".join(err_msgs))
293 def StopMaster(stop_daemons):
294 """Deactivate this node as master.
296 The function will always try to deactivate the IP address of the
297 master. It will also stop the master daemons depending on the
298 stop_daemons parameter.
300 @type stop_daemons: boolean
301 @param stop_daemons: whether to also stop the master daemons
302 (ganeti-masterd and ganeti-rapi)
306 # TODO: log and report back to the caller the error failures; we
307 # need to decide in which case we fail the RPC for this
309 # GetMasterInfo will raise an exception if not able to return data
310 master_netdev, master_ip, _ = GetMasterInfo()
312 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
313 "dev", master_netdev])
315 logging.error("Can't remove the master IP, error: %s", result.output)
316 # but otherwise ignore the failure
319 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
321 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
323 result.cmd, result.exit_code, result.output)
326 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
327 """Joins this node to the cluster.
329 This does the following:
330 - updates the hostkeys of the machine (rsa and dsa)
331 - adds the ssh private key to the user
332 - adds the ssh public key to the users' authorized_keys file
335 @param dsa: the DSA private key to write
337 @param dsapub: the DSA public key to write
339 @param rsa: the RSA private key to write
341 @param rsapub: the RSA public key to write
343 @param sshkey: the SSH private key to write
345 @param sshpub: the SSH public key to write
347 @return: the success of the operation
350 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
351 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
352 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
353 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
354 for name, content, mode in sshd_keys:
355 utils.WriteFile(name, data=content, mode=mode)
358 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
360 except errors.OpExecError, err:
361 _Fail("Error while processing user ssh files: %s", err, exc=True)
363 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
364 utils.WriteFile(name, data=content, mode=0600)
366 utils.AddAuthorizedKey(auth_keys, sshpub)
368 result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
370 _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
371 result.cmd, result.exit_code, result.output)
374 def LeaveCluster(modify_ssh_setup):
375 """Cleans up and remove the current node.
377 This function cleans up and prepares the current node to be removed
380 If processing is successful, then it raises an
381 L{errors.QuitGanetiException} which is used as a special case to
382 shutdown the node daemon.
384 @param modify_ssh_setup: boolean
387 _CleanDirectory(constants.DATA_DIR)
392 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
394 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
396 utils.RemoveFile(priv_key)
397 utils.RemoveFile(pub_key)
398 except errors.OpExecError:
399 logging.exception("Error while processing ssh files")
402 utils.RemoveFile(constants.CONFD_HMAC_KEY)
403 utils.RemoveFile(constants.RAPI_CERT_FILE)
404 utils.RemoveFile(constants.NODED_CERT_FILE)
405 except: # pylint: disable-msg=W0702
406 logging.exception("Error while removing cluster secrets")
408 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
410 logging.error("Command %s failed with exitcode %s and error %s",
411 result.cmd, result.exit_code, result.output)
413 # Raise a custom exception (handled in ganeti-noded)
414 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
417 def GetNodeInfo(vgname, hypervisor_type):
418 """Gives back a hash with different information about the node.
420 @type vgname: C{string}
421 @param vgname: the name of the volume group to ask for disk space information
422 @type hypervisor_type: C{str}
423 @param hypervisor_type: the name of the hypervisor to ask for
426 @return: dictionary with the following keys:
427 - vg_size is the size of the configured volume group in MiB
428 - vg_free is the free size of the volume group in MiB
429 - memory_dom0 is the memory allocated for domain0 in MiB
430 - memory_free is the currently available (free) ram in MiB
431 - memory_total is the total number of ram in MiB
435 vginfo = _GetVGInfo(vgname)
436 outputarray['vg_size'] = vginfo['vg_size']
437 outputarray['vg_free'] = vginfo['vg_free']
439 hyper = hypervisor.GetHypervisor(hypervisor_type)
440 hyp_info = hyper.GetNodeInfo()
441 if hyp_info is not None:
442 outputarray.update(hyp_info)
444 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
449 def VerifyNode(what, cluster_name):
450 """Verify the status of the local node.
452 Based on the input L{what} parameter, various checks are done on the
455 If the I{filelist} key is present, this list of
456 files is checksummed and the file/checksum pairs are returned.
458 If the I{nodelist} key is present, we check that we have
459 connectivity via ssh with the target nodes (and check the hostname
462 If the I{node-net-test} key is present, we check that we have
463 connectivity to the given nodes via both primary IP and, if
464 applicable, secondary IPs.
467 @param what: a dictionary of things to check:
468 - filelist: list of files for which to compute checksums
469 - nodelist: list of nodes we should check ssh communication with
470 - node-net-test: list of nodes we should check node daemon port
472 - hypervisor: list with hypervisors to run the verify for
474 @return: a dictionary with the same keys as the input dict, and
475 values representing the result of the checks
480 if constants.NV_HYPERVISOR in what:
481 result[constants.NV_HYPERVISOR] = tmp = {}
482 for hv_name in what[constants.NV_HYPERVISOR]:
484 val = hypervisor.GetHypervisor(hv_name).Verify()
485 except errors.HypervisorError, err:
486 val = "Error while checking hypervisor: %s" % str(err)
489 if constants.NV_FILELIST in what:
490 result[constants.NV_FILELIST] = utils.FingerprintFiles(
491 what[constants.NV_FILELIST])
493 if constants.NV_NODELIST in what:
494 result[constants.NV_NODELIST] = tmp = {}
495 random.shuffle(what[constants.NV_NODELIST])
496 for node in what[constants.NV_NODELIST]:
497 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
501 if constants.NV_NODENETTEST in what:
502 result[constants.NV_NODENETTEST] = tmp = {}
503 my_name = utils.HostInfo().name
504 my_pip = my_sip = None
505 for name, pip, sip in what[constants.NV_NODENETTEST]:
511 tmp[my_name] = ("Can't find my own primary/secondary IP"
514 port = utils.GetDaemonPort(constants.NODED)
515 for name, pip, sip in what[constants.NV_NODENETTEST]:
517 if not utils.TcpPing(pip, port, source=my_pip):
518 fail.append("primary")
520 if not utils.TcpPing(sip, port, source=my_sip):
521 fail.append("secondary")
523 tmp[name] = ("failure using the %s interface(s)" %
526 if constants.NV_LVLIST in what:
528 val = GetVolumeList(what[constants.NV_LVLIST])
531 result[constants.NV_LVLIST] = val
533 if constants.NV_INSTANCELIST in what:
534 # GetInstanceList can fail
536 val = GetInstanceList(what[constants.NV_INSTANCELIST])
539 result[constants.NV_INSTANCELIST] = val
541 if constants.NV_VGLIST in what:
542 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
544 if constants.NV_PVLIST in what:
545 result[constants.NV_PVLIST] = \
546 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
547 filter_allocatable=False)
549 if constants.NV_VERSION in what:
550 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
551 constants.RELEASE_VERSION)
553 if constants.NV_HVINFO in what:
554 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
555 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
557 if constants.NV_DRBDLIST in what:
559 used_minors = bdev.DRBD8.GetUsedDevs().keys()
560 except errors.BlockDeviceError, err:
561 logging.warning("Can't get used minors list", exc_info=True)
562 used_minors = str(err)
563 result[constants.NV_DRBDLIST] = used_minors
565 if constants.NV_NODESETUP in what:
566 result[constants.NV_NODESETUP] = tmpr = []
567 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
568 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
569 " under /sys, missing required directories /sys/block"
570 " and /sys/class/net")
571 if (not os.path.isdir("/proc/sys") or
572 not os.path.isfile("/proc/sysrq-trigger")):
573 tmpr.append("The procfs filesystem doesn't seem to be mounted"
574 " under /proc, missing required directory /proc/sys and"
575 " the file /proc/sysrq-trigger")
577 if constants.NV_TIME in what:
578 result[constants.NV_TIME] = utils.SplitTime(time.time())
583 def GetVolumeList(vg_name):
584 """Compute list of logical volumes and their size.
587 @param vg_name: the volume group whose LVs we should list
590 dictionary of all partions (key) with value being a tuple of
591 their size (in MiB), inactive and online status::
593 {'test1': ('20.06', True, True)}
595 in case of errors, a string is returned with the error
601 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
602 "--separator=%s" % sep,
603 "-olv_name,lv_size,lv_attr", vg_name])
605 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
607 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
608 for line in result.stdout.splitlines():
610 match = valid_line_re.match(line)
612 logging.error("Invalid line returned from lvs output: '%s'", line)
614 name, size, attr = match.groups()
615 inactive = attr[4] == '-'
616 online = attr[5] == 'o'
617 virtual = attr[0] == 'v'
619 # we don't want to report such volumes as existing, since they
620 # don't really hold data
622 lvs[name] = (size, inactive, online)
627 def ListVolumeGroups():
628 """List the volume groups and their size.
631 @return: dictionary with keys volume name and values the
635 return utils.ListVolumeGroups()
639 """List all volumes on this node.
643 A list of dictionaries, each having four keys:
644 - name: the logical volume name,
645 - size: the size of the logical volume
646 - dev: the physical device on which the LV lives
647 - vg: the volume group to which it belongs
649 In case of errors, we return an empty list and log the
652 Note that since a logical volume can live on multiple physical
653 volumes, the resulting list might include a logical volume
657 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
659 "--options=lv_name,lv_size,devices,vg_name"])
661 _Fail("Failed to list logical volumes, lvs output: %s",
665 return dev.split('(')[0]
668 return [parse_dev(x) for x in dev.split(",")]
671 line = [v.strip() for v in line]
672 return [{'name': line[0], 'size': line[1],
673 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
676 for line in result.stdout.splitlines():
677 if line.count('|') >= 3:
678 all_devs.extend(map_line(line.split('|')))
680 logging.warning("Strange line in the output from lvs: '%s'", line)
684 def BridgesExist(bridges_list):
685 """Check if a list of bridges exist on the current node.
688 @return: C{True} if all of them exist, C{False} otherwise
692 for bridge in bridges_list:
693 if not utils.BridgeExists(bridge):
694 missing.append(bridge)
697 _Fail("Missing bridges %s", utils.CommaJoin(missing))
700 def GetInstanceList(hypervisor_list):
701 """Provides a list of instances.
703 @type hypervisor_list: list
704 @param hypervisor_list: the list of hypervisors to query information
707 @return: a list of all running instances on the current node
708 - instance1.example.com
709 - instance2.example.com
713 for hname in hypervisor_list:
715 names = hypervisor.GetHypervisor(hname).ListInstances()
716 results.extend(names)
717 except errors.HypervisorError, err:
718 _Fail("Error enumerating instances (hypervisor %s): %s",
719 hname, err, exc=True)
724 def GetInstanceInfo(instance, hname):
725 """Gives back the information about an instance as a dictionary.
727 @type instance: string
728 @param instance: the instance name
730 @param hname: the hypervisor type of the instance
733 @return: dictionary with the following keys:
734 - memory: memory size of instance (int)
735 - state: xen state of instance (string)
736 - time: cpu time of instance (float)
741 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
742 if iinfo is not None:
743 output['memory'] = iinfo[2]
744 output['state'] = iinfo[4]
745 output['time'] = iinfo[5]
750 def GetInstanceMigratable(instance):
751 """Gives whether an instance can be migrated.
753 @type instance: L{objects.Instance}
754 @param instance: object representing the instance to be checked.
757 @return: tuple of (result, description) where:
758 - result: whether the instance can be migrated or not
759 - description: a description of the issue, if relevant
762 hyper = hypervisor.GetHypervisor(instance.hypervisor)
763 iname = instance.name
764 if iname not in hyper.ListInstances():
765 _Fail("Instance %s is not running", iname)
767 for idx in range(len(instance.disks)):
768 link_name = _GetBlockDevSymlinkPath(iname, idx)
769 if not os.path.islink(link_name):
770 _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
773 def GetAllInstancesInfo(hypervisor_list):
774 """Gather data about all instances.
776 This is the equivalent of L{GetInstanceInfo}, except that it
777 computes data for all instances at once, thus being faster if one
778 needs data about more than one instance.
780 @type hypervisor_list: list
781 @param hypervisor_list: list of hypervisors to query for instance data
784 @return: dictionary of instance: data, with data having the following keys:
785 - memory: memory size of instance (int)
786 - state: xen state of instance (string)
787 - time: cpu time of instance (float)
788 - vcpus: the number of vcpus
793 for hname in hypervisor_list:
794 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
796 for name, _, memory, vcpus, state, times in iinfo:
804 # we only check static parameters, like memory and vcpus,
805 # and not state and time which can change between the
806 # invocations of the different hypervisors
807 for key in 'memory', 'vcpus':
808 if value[key] != output[name][key]:
809 _Fail("Instance %s is running twice"
810 " with different parameters", name)
816 def _InstanceLogName(kind, os_name, instance):
817 """Compute the OS log filename for a given instance and operation.
819 The instance name and os name are passed in as strings since not all
820 operations have these as part of an instance object.
823 @param kind: the operation type (e.g. add, import, etc.)
824 @type os_name: string
825 @param os_name: the os name
826 @type instance: string
827 @param instance: the name of the instance being imported/added/etc.
830 base = ("%s-%s-%s-%s.log" %
831 (kind, os_name, instance, utils.TimestampForFilename()))
832 return utils.PathJoin(constants.LOG_OS_DIR, base)
835 def InstanceOsAdd(instance, reinstall, debug):
836 """Add an OS to an instance.
838 @type instance: L{objects.Instance}
839 @param instance: Instance whose OS is to be installed
840 @type reinstall: boolean
841 @param reinstall: whether this is an instance reinstall
843 @param debug: debug level, passed to the OS scripts
847 inst_os = OSFromDisk(instance.os)
849 create_env = OSEnvironment(instance, inst_os, debug)
851 create_env['INSTANCE_REINSTALL'] = "1"
853 logfile = _InstanceLogName("add", instance.os, instance.name)
855 result = utils.RunCmd([inst_os.create_script], env=create_env,
856 cwd=inst_os.path, output=logfile,)
858 logging.error("os create command '%s' returned error: %s, logfile: %s,"
859 " output: %s", result.cmd, result.fail_reason, logfile,
861 lines = [utils.SafeEncode(val)
862 for val in utils.TailFile(logfile, lines=20)]
863 _Fail("OS create script failed (%s), last lines in the"
864 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
867 def RunRenameInstance(instance, old_name, debug):
868 """Run the OS rename script for an instance.
870 @type instance: L{objects.Instance}
871 @param instance: Instance whose OS is to be installed
872 @type old_name: string
873 @param old_name: previous instance name
875 @param debug: debug level, passed to the OS scripts
877 @return: the success of the operation
880 inst_os = OSFromDisk(instance.os)
882 rename_env = OSEnvironment(instance, inst_os, debug)
883 rename_env['OLD_INSTANCE_NAME'] = old_name
885 logfile = _InstanceLogName("rename", instance.os,
886 "%s-%s" % (old_name, instance.name))
888 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
889 cwd=inst_os.path, output=logfile)
892 logging.error("os create command '%s' returned error: %s output: %s",
893 result.cmd, result.fail_reason, result.output)
894 lines = [utils.SafeEncode(val)
895 for val in utils.TailFile(logfile, lines=20)]
896 _Fail("OS rename script failed (%s), last lines in the"
897 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
900 def _GetVGInfo(vg_name):
901 """Get information about the volume group.
904 @param vg_name: the volume group which we query
907 A dictionary with the following keys:
908 - C{vg_size} is the total size of the volume group in MiB
909 - C{vg_free} is the free size of the volume group in MiB
910 - C{pv_count} are the number of physical disks in that VG
912 If an error occurs during gathering of data, we return the same dict
913 with keys all set to None.
916 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
918 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
919 "--nosuffix", "--units=m", "--separator=:", vg_name])
922 logging.error("volume group %s not present", vg_name)
924 valarr = retval.stdout.strip().rstrip(':').split(':')
928 "vg_size": int(round(float(valarr[0]), 0)),
929 "vg_free": int(round(float(valarr[1]), 0)),
930 "pv_count": int(valarr[2]),
932 except (TypeError, ValueError), err:
933 logging.exception("Fail to parse vgs output: %s", err)
935 logging.error("vgs output has the wrong number of fields (expected"
936 " three): %s", str(valarr))
940 def _GetBlockDevSymlinkPath(instance_name, idx):
941 return utils.PathJoin(constants.DISK_LINKS_DIR,
942 "%s:%d" % (instance_name, idx))
945 def _SymlinkBlockDev(instance_name, device_path, idx):
946 """Set up symlinks to a instance's block device.
948 This is an auxiliary function run when an instance is start (on the primary
949 node) or when an instance is migrated (on the target node).
952 @param instance_name: the name of the target instance
953 @param device_path: path of the physical block device, on the node
954 @param idx: the disk index
955 @return: absolute path to the disk's symlink
958 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
960 os.symlink(device_path, link_name)
962 if err.errno == errno.EEXIST:
963 if (not os.path.islink(link_name) or
964 os.readlink(link_name) != device_path):
966 os.symlink(device_path, link_name)
973 def _RemoveBlockDevLinks(instance_name, disks):
974 """Remove the block device symlinks belonging to the given instance.
977 for idx, _ in enumerate(disks):
978 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
979 if os.path.islink(link_name):
983 logging.exception("Can't remove symlink '%s'", link_name)
986 def _GatherAndLinkBlockDevs(instance):
987 """Set up an instance's block device(s).
989 This is run on the primary node at instance startup. The block
990 devices must be already assembled.
992 @type instance: L{objects.Instance}
993 @param instance: the instance whose disks we shoul assemble
995 @return: list of (disk_object, device_path)
999 for idx, disk in enumerate(instance.disks):
1000 device = _RecursiveFindBD(disk)
1002 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1006 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1008 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1011 block_devices.append((disk, link_name))
1013 return block_devices
1016 def StartInstance(instance):
1017 """Start an instance.
1019 @type instance: L{objects.Instance}
1020 @param instance: the instance object
1024 running_instances = GetInstanceList([instance.hypervisor])
1026 if instance.name in running_instances:
1027 logging.info("Instance %s already running, not starting", instance.name)
1031 block_devices = _GatherAndLinkBlockDevs(instance)
1032 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1033 hyper.StartInstance(instance, block_devices)
1034 except errors.BlockDeviceError, err:
1035 _Fail("Block device error: %s", err, exc=True)
1036 except errors.HypervisorError, err:
1037 _RemoveBlockDevLinks(instance.name, instance.disks)
1038 _Fail("Hypervisor error: %s", err, exc=True)
1041 def InstanceShutdown(instance, timeout):
1042 """Shut an instance down.
1044 @note: this functions uses polling with a hardcoded timeout.
1046 @type instance: L{objects.Instance}
1047 @param instance: the instance object
1048 @type timeout: integer
1049 @param timeout: maximum timeout for soft shutdown
1053 hv_name = instance.hypervisor
1054 hyper = hypervisor.GetHypervisor(hv_name)
1055 iname = instance.name
1057 if instance.name not in hyper.ListInstances():
1058 logging.info("Instance %s not running, doing nothing", iname)
1063 self.tried_once = False
1066 if iname not in hyper.ListInstances():
1070 hyper.StopInstance(instance, retry=self.tried_once)
1071 except errors.HypervisorError, err:
1072 if iname not in hyper.ListInstances():
1073 # if the instance is no longer existing, consider this a
1074 # success and go to cleanup
1077 _Fail("Failed to stop instance %s: %s", iname, err)
1079 self.tried_once = True
1081 raise utils.RetryAgain()
1084 utils.Retry(_TryShutdown(), 5, timeout)
1085 except utils.RetryTimeout:
1086 # the shutdown did not succeed
1087 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1090 hyper.StopInstance(instance, force=True)
1091 except errors.HypervisorError, err:
1092 if iname in hyper.ListInstances():
1093 # only raise an error if the instance still exists, otherwise
1094 # the error could simply be "instance ... unknown"!
1095 _Fail("Failed to force stop instance %s: %s", iname, err)
1099 if iname in hyper.ListInstances():
1100 _Fail("Could not shutdown instance %s even by destroy", iname)
1102 _RemoveBlockDevLinks(iname, instance.disks)
1105 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1106 """Reboot an instance.
1108 @type instance: L{objects.Instance}
1109 @param instance: the instance object to reboot
1110 @type reboot_type: str
1111 @param reboot_type: the type of reboot, one the following
1113 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1114 instance OS, do not recreate the VM
1115 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1116 restart the VM (at the hypervisor level)
1117 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1118 not accepted here, since that mode is handled differently, in
1119 cmdlib, and translates into full stop and start of the
1120 instance (instead of a call_instance_reboot RPC)
1121 @type shutdown_timeout: integer
1122 @param shutdown_timeout: maximum timeout for soft shutdown
1126 running_instances = GetInstanceList([instance.hypervisor])
1128 if instance.name not in running_instances:
1129 _Fail("Cannot reboot instance %s that is not running", instance.name)
1131 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1132 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1134 hyper.RebootInstance(instance)
1135 except errors.HypervisorError, err:
1136 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1137 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1139 InstanceShutdown(instance, shutdown_timeout)
1140 return StartInstance(instance)
1141 except errors.HypervisorError, err:
1142 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1144 _Fail("Invalid reboot_type received: %s", reboot_type)
1147 def MigrationInfo(instance):
1148 """Gather information about an instance to be migrated.
1150 @type instance: L{objects.Instance}
1151 @param instance: the instance definition
1154 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1156 info = hyper.MigrationInfo(instance)
1157 except errors.HypervisorError, err:
1158 _Fail("Failed to fetch migration information: %s", err, exc=True)
1162 def AcceptInstance(instance, info, target):
1163 """Prepare the node to accept an instance.
1165 @type instance: L{objects.Instance}
1166 @param instance: the instance definition
1167 @type info: string/data (opaque)
1168 @param info: migration information, from the source node
1169 @type target: string
1170 @param target: target host (usually ip), on this node
1173 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1175 hyper.AcceptInstance(instance, info, target)
1176 except errors.HypervisorError, err:
1177 _Fail("Failed to accept instance: %s", err, exc=True)
1180 def FinalizeMigration(instance, info, success):
1181 """Finalize any preparation to accept an instance.
1183 @type instance: L{objects.Instance}
1184 @param instance: the instance definition
1185 @type info: string/data (opaque)
1186 @param info: migration information, from the source node
1187 @type success: boolean
1188 @param success: whether the migration was a success or a failure
1191 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1193 hyper.FinalizeMigration(instance, info, success)
1194 except errors.HypervisorError, err:
1195 _Fail("Failed to finalize migration: %s", err, exc=True)
1198 def MigrateInstance(instance, target, live):
1199 """Migrates an instance to another node.
1201 @type instance: L{objects.Instance}
1202 @param instance: the instance definition
1203 @type target: string
1204 @param target: the target node name
1206 @param live: whether the migration should be done live or not (the
1207 interpretation of this parameter is left to the hypervisor)
1209 @return: a tuple of (success, msg) where:
1210 - succes is a boolean denoting the success/failure of the operation
1211 - msg is a string with details in case of failure
1214 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1217 hyper.MigrateInstance(instance, target, live)
1218 except errors.HypervisorError, err:
1219 _Fail("Failed to migrate instance: %s", err, exc=True)
1222 def BlockdevCreate(disk, size, owner, on_primary, info):
1223 """Creates a block device for an instance.
1225 @type disk: L{objects.Disk}
1226 @param disk: the object describing the disk we should create
1228 @param size: the size of the physical underlying device, in MiB
1230 @param owner: the name of the instance for which disk is created,
1231 used for device cache data
1232 @type on_primary: boolean
1233 @param on_primary: indicates if it is the primary node or not
1235 @param info: string that will be sent to the physical device
1236 creation, used for example to set (LVM) tags on LVs
1238 @return: the new unique_id of the device (this can sometime be
1239 computed only after creation), or None. On secondary nodes,
1240 it's not required to return anything.
1243 # TODO: remove the obsolete 'size' argument
1244 # pylint: disable-msg=W0613
1247 for child in disk.children:
1249 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1250 except errors.BlockDeviceError, err:
1251 _Fail("Can't assemble device %s: %s", child, err)
1252 if on_primary or disk.AssembleOnSecondary():
1253 # we need the children open in case the device itself has to
1256 # pylint: disable-msg=E1103
1258 except errors.BlockDeviceError, err:
1259 _Fail("Can't make child '%s' read-write: %s", child, err)
1263 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1264 except errors.BlockDeviceError, err:
1265 _Fail("Can't create block device: %s", err)
1267 if on_primary or disk.AssembleOnSecondary():
1270 except errors.BlockDeviceError, err:
1271 _Fail("Can't assemble device after creation, unusual event: %s", err)
1272 device.SetSyncSpeed(constants.SYNC_SPEED)
1273 if on_primary or disk.OpenOnSecondary():
1275 device.Open(force=True)
1276 except errors.BlockDeviceError, err:
1277 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1278 DevCacheManager.UpdateCache(device.dev_path, owner,
1279 on_primary, disk.iv_name)
1281 device.SetInfo(info)
1283 return device.unique_id
1286 def BlockdevRemove(disk):
1287 """Remove a block device.
1289 @note: This is intended to be called recursively.
1291 @type disk: L{objects.Disk}
1292 @param disk: the disk object we should remove
1294 @return: the success of the operation
1299 rdev = _RecursiveFindBD(disk)
1300 except errors.BlockDeviceError, err:
1301 # probably can't attach
1302 logging.info("Can't attach to device %s in remove", disk)
1304 if rdev is not None:
1305 r_path = rdev.dev_path
1308 except errors.BlockDeviceError, err:
1309 msgs.append(str(err))
1311 DevCacheManager.RemoveCache(r_path)
1314 for child in disk.children:
1316 BlockdevRemove(child)
1317 except RPCFail, err:
1318 msgs.append(str(err))
1321 _Fail("; ".join(msgs))
1324 def _RecursiveAssembleBD(disk, owner, as_primary):
1325 """Activate a block device for an instance.
1327 This is run on the primary and secondary nodes for an instance.
1329 @note: this function is called recursively.
1331 @type disk: L{objects.Disk}
1332 @param disk: the disk we try to assemble
1334 @param owner: the name of the instance which owns the disk
1335 @type as_primary: boolean
1336 @param as_primary: if we should make the block device
1339 @return: the assembled device or None (in case no device
1341 @raise errors.BlockDeviceError: in case there is an error
1342 during the activation of the children or the device
1348 mcn = disk.ChildrenNeeded()
1350 mcn = 0 # max number of Nones allowed
1352 mcn = len(disk.children) - mcn # max number of Nones
1353 for chld_disk in disk.children:
1355 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1356 except errors.BlockDeviceError, err:
1357 if children.count(None) >= mcn:
1360 logging.error("Error in child activation (but continuing): %s",
1362 children.append(cdev)
1364 if as_primary or disk.AssembleOnSecondary():
1365 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1366 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1368 if as_primary or disk.OpenOnSecondary():
1370 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1371 as_primary, disk.iv_name)
1378 def BlockdevAssemble(disk, owner, as_primary):
1379 """Activate a block device for an instance.
1381 This is a wrapper over _RecursiveAssembleBD.
1383 @rtype: str or boolean
1384 @return: a C{/dev/...} path for primary nodes, and
1385 C{True} for secondary nodes
1389 result = _RecursiveAssembleBD(disk, owner, as_primary)
1390 if isinstance(result, bdev.BlockDev):
1391 # pylint: disable-msg=E1103
1392 result = result.dev_path
1393 except errors.BlockDeviceError, err:
1394 _Fail("Error while assembling disk: %s", err, exc=True)
1399 def BlockdevShutdown(disk):
1400 """Shut down a block device.
1402 First, if the device is assembled (Attach() is successful), then
1403 the device is shutdown. Then the children of the device are
1406 This function is called recursively. Note that we don't cache the
1407 children or such, as oppossed to assemble, shutdown of different
1408 devices doesn't require that the upper device was active.
1410 @type disk: L{objects.Disk}
1411 @param disk: the description of the disk we should
1417 r_dev = _RecursiveFindBD(disk)
1418 if r_dev is not None:
1419 r_path = r_dev.dev_path
1422 DevCacheManager.RemoveCache(r_path)
1423 except errors.BlockDeviceError, err:
1424 msgs.append(str(err))
1427 for child in disk.children:
1429 BlockdevShutdown(child)
1430 except RPCFail, err:
1431 msgs.append(str(err))
1434 _Fail("; ".join(msgs))
1437 def BlockdevAddchildren(parent_cdev, new_cdevs):
1438 """Extend a mirrored block device.
1440 @type parent_cdev: L{objects.Disk}
1441 @param parent_cdev: the disk to which we should add children
1442 @type new_cdevs: list of L{objects.Disk}
1443 @param new_cdevs: the list of children which we should add
1447 parent_bdev = _RecursiveFindBD(parent_cdev)
1448 if parent_bdev is None:
1449 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1450 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1451 if new_bdevs.count(None) > 0:
1452 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1453 parent_bdev.AddChildren(new_bdevs)
1456 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1457 """Shrink a mirrored block device.
1459 @type parent_cdev: L{objects.Disk}
1460 @param parent_cdev: the disk from which we should remove children
1461 @type new_cdevs: list of L{objects.Disk}
1462 @param new_cdevs: the list of children which we should remove
1466 parent_bdev = _RecursiveFindBD(parent_cdev)
1467 if parent_bdev is None:
1468 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1470 for disk in new_cdevs:
1471 rpath = disk.StaticDevPath()
1473 bd = _RecursiveFindBD(disk)
1475 _Fail("Can't find device %s while removing children", disk)
1477 devs.append(bd.dev_path)
1479 if not utils.IsNormAbsPath(rpath):
1480 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1482 parent_bdev.RemoveChildren(devs)
1485 def BlockdevGetmirrorstatus(disks):
1486 """Get the mirroring status of a list of devices.
1488 @type disks: list of L{objects.Disk}
1489 @param disks: the list of disks which we should query
1492 a list of (mirror_done, estimated_time) tuples, which
1493 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1494 @raise errors.BlockDeviceError: if any of the disks cannot be
1500 rbd = _RecursiveFindBD(dsk)
1502 _Fail("Can't find device %s", dsk)
1504 stats.append(rbd.CombinedSyncStatus())
1509 def _RecursiveFindBD(disk):
1510 """Check if a device is activated.
1512 If so, return information about the real device.
1514 @type disk: L{objects.Disk}
1515 @param disk: the disk object we need to find
1517 @return: None if the device can't be found,
1518 otherwise the device instance
1523 for chdisk in disk.children:
1524 children.append(_RecursiveFindBD(chdisk))
1526 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1529 def BlockdevFind(disk):
1530 """Check if a device is activated.
1532 If it is, return information about the real device.
1534 @type disk: L{objects.Disk}
1535 @param disk: the disk to find
1536 @rtype: None or objects.BlockDevStatus
1537 @return: None if the disk cannot be found, otherwise a the current
1542 rbd = _RecursiveFindBD(disk)
1543 except errors.BlockDeviceError, err:
1544 _Fail("Failed to find device: %s", err, exc=True)
1549 return rbd.GetSyncStatus()
1552 def BlockdevGetsize(disks):
1553 """Computes the size of the given disks.
1555 If a disk is not found, returns None instead.
1557 @type disks: list of L{objects.Disk}
1558 @param disks: the list of disk to compute the size for
1560 @return: list with elements None if the disk cannot be found,
1567 rbd = _RecursiveFindBD(cf)
1568 except errors.BlockDeviceError:
1574 result.append(rbd.GetActualSize())
1578 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1579 """Export a block device to a remote node.
1581 @type disk: L{objects.Disk}
1582 @param disk: the description of the disk to export
1583 @type dest_node: str
1584 @param dest_node: the destination node to export to
1585 @type dest_path: str
1586 @param dest_path: the destination path on the target node
1587 @type cluster_name: str
1588 @param cluster_name: the cluster name, needed for SSH hostalias
1592 real_disk = _RecursiveFindBD(disk)
1593 if real_disk is None:
1594 _Fail("Block device '%s' is not set up", disk)
1598 # the block size on the read dd is 1MiB to match our units
1599 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1600 "dd if=%s bs=1048576 count=%s",
1601 real_disk.dev_path, str(disk.size))
1603 # we set here a smaller block size as, due to ssh buffering, more
1604 # than 64-128k will mostly ignored; we use nocreat to fail if the
1605 # device is not already there or we pass a wrong path; we use
1606 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1607 # to not buffer too much memory; this means that at best, we flush
1608 # every 64k, which will not be very fast
1609 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1610 " oflag=dsync", dest_path)
1612 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1613 constants.GANETI_RUNAS,
1616 # all commands have been checked, so we're safe to combine them
1617 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1619 result = utils.RunCmd(["bash", "-c", command])
1622 _Fail("Disk copy command '%s' returned error: %s"
1623 " output: %s", command, result.fail_reason, result.output)
1626 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1627 """Write a file to the filesystem.
1629 This allows the master to overwrite(!) a file. It will only perform
1630 the operation if the file belongs to a list of configuration files.
1632 @type file_name: str
1633 @param file_name: the target file name
1635 @param data: the new contents of the file
1637 @param mode: the mode to give the file (can be None)
1639 @param uid: the owner of the file (can be -1 for default)
1641 @param gid: the group of the file (can be -1 for default)
1643 @param atime: the atime to set on the file (can be None)
1645 @param mtime: the mtime to set on the file (can be None)
1649 if not os.path.isabs(file_name):
1650 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1652 if file_name not in _ALLOWED_UPLOAD_FILES:
1653 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1656 raw_data = _Decompress(data)
1658 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1659 atime=atime, mtime=mtime)
1662 def WriteSsconfFiles(values):
1663 """Update all ssconf files.
1665 Wrapper around the SimpleStore.WriteFiles.
1668 ssconf.SimpleStore().WriteFiles(values)
1671 def _ErrnoOrStr(err):
1672 """Format an EnvironmentError exception.
1674 If the L{err} argument has an errno attribute, it will be looked up
1675 and converted into a textual C{E...} description. Otherwise the
1676 string representation of the error will be returned.
1678 @type err: L{EnvironmentError}
1679 @param err: the exception to format
1682 if hasattr(err, 'errno'):
1683 detail = errno.errorcode[err.errno]
1689 def _OSOndiskAPIVersion(os_dir):
1690 """Compute and return the API version of a given OS.
1692 This function will try to read the API version of the OS residing in
1693 the 'os_dir' directory.
1696 @param os_dir: the directory in which we should look for the OS
1698 @return: tuple (status, data) with status denoting the validity and
1699 data holding either the vaid versions or an error message
1702 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1705 st = os.stat(api_file)
1706 except EnvironmentError, err:
1707 return False, ("Required file '%s' not found under path %s: %s" %
1708 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1710 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1711 return False, ("File '%s' in %s is not a regular file" %
1712 (constants.OS_API_FILE, os_dir))
1715 api_versions = utils.ReadFile(api_file).splitlines()
1716 except EnvironmentError, err:
1717 return False, ("Error while reading the API version file at %s: %s" %
1718 (api_file, _ErrnoOrStr(err)))
1721 api_versions = [int(version.strip()) for version in api_versions]
1722 except (TypeError, ValueError), err:
1723 return False, ("API version(s) can't be converted to integer: %s" %
1726 return True, api_versions
1729 def DiagnoseOS(top_dirs=None):
1730 """Compute the validity for all OSes.
1732 @type top_dirs: list
1733 @param top_dirs: the list of directories in which to
1734 search (if not given defaults to
1735 L{constants.OS_SEARCH_PATH})
1736 @rtype: list of L{objects.OS}
1737 @return: a list of tuples (name, path, status, diagnose, variants)
1738 for all (potential) OSes under all search paths, where:
1739 - name is the (potential) OS name
1740 - path is the full path to the OS
1741 - status True/False is the validity of the OS
1742 - diagnose is the error message for an invalid OS, otherwise empty
1743 - variants is a list of supported OS variants, if any
1746 if top_dirs is None:
1747 top_dirs = constants.OS_SEARCH_PATH
1750 for dir_name in top_dirs:
1751 if os.path.isdir(dir_name):
1753 f_names = utils.ListVisibleFiles(dir_name)
1754 except EnvironmentError, err:
1755 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1757 for name in f_names:
1758 os_path = utils.PathJoin(dir_name, name)
1759 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1762 variants = os_inst.supported_variants
1766 result.append((name, os_path, status, diagnose, variants))
1771 def _TryOSFromDisk(name, base_dir=None):
1772 """Create an OS instance from disk.
1774 This function will return an OS instance if the given name is a
1777 @type base_dir: string
1778 @keyword base_dir: Base directory containing OS installations.
1779 Defaults to a search in all the OS_SEARCH_PATH dirs.
1781 @return: success and either the OS instance if we find a valid one,
1785 if base_dir is None:
1786 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1788 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1791 return False, "Directory for OS %s not found in search path" % name
1793 status, api_versions = _OSOndiskAPIVersion(os_dir)
1796 return status, api_versions
1798 if not constants.OS_API_VERSIONS.intersection(api_versions):
1799 return False, ("API version mismatch for path '%s': found %s, want %s." %
1800 (os_dir, api_versions, constants.OS_API_VERSIONS))
1802 # OS Files dictionary, we will populate it with the absolute path names
1803 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1805 if max(api_versions) >= constants.OS_API_V15:
1806 os_files[constants.OS_VARIANTS_FILE] = ''
1808 for filename in os_files:
1809 os_files[filename] = utils.PathJoin(os_dir, filename)
1812 st = os.stat(os_files[filename])
1813 except EnvironmentError, err:
1814 return False, ("File '%s' under path '%s' is missing (%s)" %
1815 (filename, os_dir, _ErrnoOrStr(err)))
1817 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1818 return False, ("File '%s' under path '%s' is not a regular file" %
1821 if filename in constants.OS_SCRIPTS:
1822 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1823 return False, ("File '%s' under path '%s' is not executable" %
1827 if constants.OS_VARIANTS_FILE in os_files:
1828 variants_file = os_files[constants.OS_VARIANTS_FILE]
1830 variants = utils.ReadFile(variants_file).splitlines()
1831 except EnvironmentError, err:
1832 return False, ("Error while reading the OS variants file at %s: %s" %
1833 (variants_file, _ErrnoOrStr(err)))
1835 return False, ("No supported os variant found")
1837 os_obj = objects.OS(name=name, path=os_dir,
1838 create_script=os_files[constants.OS_SCRIPT_CREATE],
1839 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1840 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1841 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1842 supported_variants=variants,
1843 api_versions=api_versions)
1847 def OSFromDisk(name, base_dir=None):
1848 """Create an OS instance from disk.
1850 This function will return an OS instance if the given name is a
1851 valid OS name. Otherwise, it will raise an appropriate
1852 L{RPCFail} exception, detailing why this is not a valid OS.
1854 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1855 an exception but returns true/false status data.
1857 @type base_dir: string
1858 @keyword base_dir: Base directory containing OS installations.
1859 Defaults to a search in all the OS_SEARCH_PATH dirs.
1860 @rtype: L{objects.OS}
1861 @return: the OS instance if we find a valid one
1862 @raise RPCFail: if we don't find a valid OS
1865 name_only = name.split("+", 1)[0]
1866 status, payload = _TryOSFromDisk(name_only, base_dir)
1874 def OSEnvironment(instance, inst_os, debug=0):
1875 """Calculate the environment for an os script.
1877 @type instance: L{objects.Instance}
1878 @param instance: target instance for the os script run
1879 @type inst_os: L{objects.OS}
1880 @param inst_os: operating system for which the environment is being built
1881 @type debug: integer
1882 @param debug: debug level (0 or 1, for OS Api 10)
1884 @return: dict of environment variables
1885 @raise errors.BlockDeviceError: if the block device
1891 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1892 result['OS_API_VERSION'] = '%d' % api_version
1893 result['INSTANCE_NAME'] = instance.name
1894 result['INSTANCE_OS'] = instance.os
1895 result['HYPERVISOR'] = instance.hypervisor
1896 result['DISK_COUNT'] = '%d' % len(instance.disks)
1897 result['NIC_COUNT'] = '%d' % len(instance.nics)
1898 result['DEBUG_LEVEL'] = '%d' % debug
1899 if api_version >= constants.OS_API_V15:
1901 variant = instance.os.split('+', 1)[1]
1903 variant = inst_os.supported_variants[0]
1904 result['OS_VARIANT'] = variant
1905 for idx, disk in enumerate(instance.disks):
1906 real_disk = _RecursiveFindBD(disk)
1907 if real_disk is None:
1908 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1911 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1912 result['DISK_%d_ACCESS' % idx] = disk.mode
1913 if constants.HV_DISK_TYPE in instance.hvparams:
1914 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1915 instance.hvparams[constants.HV_DISK_TYPE]
1916 if disk.dev_type in constants.LDS_BLOCK:
1917 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1918 elif disk.dev_type == constants.LD_FILE:
1919 result['DISK_%d_BACKEND_TYPE' % idx] = \
1920 'file:%s' % disk.physical_id[0]
1921 for idx, nic in enumerate(instance.nics):
1922 result['NIC_%d_MAC' % idx] = nic.mac
1924 result['NIC_%d_IP' % idx] = nic.ip
1925 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1926 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1927 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1928 if nic.nicparams[constants.NIC_LINK]:
1929 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1930 if constants.HV_NIC_TYPE in instance.hvparams:
1931 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1932 instance.hvparams[constants.HV_NIC_TYPE]
1934 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1935 for key, value in source.items():
1936 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1940 def BlockdevGrow(disk, amount):
1941 """Grow a stack of block devices.
1943 This function is called recursively, with the childrens being the
1944 first ones to resize.
1946 @type disk: L{objects.Disk}
1947 @param disk: the disk to be grown
1948 @rtype: (status, result)
1949 @return: a tuple with the status of the operation
1950 (True/False), and the errors message if status
1954 r_dev = _RecursiveFindBD(disk)
1956 _Fail("Cannot find block device %s", disk)
1960 except errors.BlockDeviceError, err:
1961 _Fail("Failed to grow block device: %s", err, exc=True)
1964 def BlockdevSnapshot(disk):
1965 """Create a snapshot copy of a block device.
1967 This function is called recursively, and the snapshot is actually created
1968 just for the leaf lvm backend device.
1970 @type disk: L{objects.Disk}
1971 @param disk: the disk to be snapshotted
1973 @return: snapshot disk path
1976 if disk.dev_type == constants.LD_DRBD8:
1977 if not disk.children:
1978 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
1980 return BlockdevSnapshot(disk.children[0])
1981 elif disk.dev_type == constants.LD_LV:
1982 r_dev = _RecursiveFindBD(disk)
1983 if r_dev is not None:
1984 # FIXME: choose a saner value for the snapshot size
1985 # let's stay on the safe side and ask for the full size, for now
1986 return r_dev.Snapshot(disk.size)
1988 _Fail("Cannot find block device %s", disk)
1990 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1991 disk.unique_id, disk.dev_type)
1994 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
1995 """Export a block device snapshot to a remote node.
1997 @type disk: L{objects.Disk}
1998 @param disk: the description of the disk to export
1999 @type dest_node: str
2000 @param dest_node: the destination node to export to
2001 @type instance: L{objects.Instance}
2002 @param instance: the instance object to whom the disk belongs
2003 @type cluster_name: str
2004 @param cluster_name: the cluster name, needed for SSH hostalias
2006 @param idx: the index of the disk in the instance's disk list,
2007 used to export to the OS scripts environment
2008 @type debug: integer
2009 @param debug: debug level, passed to the OS scripts
2013 inst_os = OSFromDisk(instance.os)
2014 export_env = OSEnvironment(instance, inst_os, debug)
2016 export_script = inst_os.export_script
2018 logfile = _InstanceLogName("export", inst_os.name, instance.name)
2019 if not os.path.exists(constants.LOG_OS_DIR):
2020 os.mkdir(constants.LOG_OS_DIR, 0750)
2021 real_disk = _RecursiveFindBD(disk)
2022 if real_disk is None:
2023 _Fail("Block device '%s' is not set up", disk)
2027 export_env['EXPORT_DEVICE'] = real_disk.dev_path
2028 export_env['EXPORT_INDEX'] = str(idx)
2030 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2031 destfile = disk.physical_id[1]
2033 # the target command is built out of three individual commands,
2034 # which are joined by pipes; we check each individual command for
2036 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2037 inst_os.path, export_script, logfile)
2041 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2042 destdir, utils.PathJoin(destdir, destfile))
2043 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2044 constants.GANETI_RUNAS,
2047 # all commands have been checked, so we're safe to combine them
2048 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2050 result = utils.RunCmd(["bash", "-c", command], env=export_env)
2053 _Fail("OS snapshot export command '%s' returned error: %s"
2054 " output: %s", command, result.fail_reason, result.output)
2057 def FinalizeExport(instance, snap_disks):
2058 """Write out the export configuration information.
2060 @type instance: L{objects.Instance}
2061 @param instance: the instance which we export, used for
2062 saving configuration
2063 @type snap_disks: list of L{objects.Disk}
2064 @param snap_disks: list of snapshot block devices, which
2065 will be used to get the actual name of the dump file
2070 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2071 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2073 config = objects.SerializableConfigParser()
2075 config.add_section(constants.INISECT_EXP)
2076 config.set(constants.INISECT_EXP, 'version', '0')
2077 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2078 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2079 config.set(constants.INISECT_EXP, 'os', instance.os)
2080 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2082 config.add_section(constants.INISECT_INS)
2083 config.set(constants.INISECT_INS, 'name', instance.name)
2084 config.set(constants.INISECT_INS, 'memory', '%d' %
2085 instance.beparams[constants.BE_MEMORY])
2086 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2087 instance.beparams[constants.BE_VCPUS])
2088 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2089 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2092 for nic_count, nic in enumerate(instance.nics):
2094 config.set(constants.INISECT_INS, 'nic%d_mac' %
2095 nic_count, '%s' % nic.mac)
2096 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2097 for param in constants.NICS_PARAMETER_TYPES:
2098 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2099 '%s' % nic.nicparams.get(param, None))
2100 # TODO: redundant: on load can read nics until it doesn't exist
2101 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2104 for disk_count, disk in enumerate(snap_disks):
2107 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2108 ('%s' % disk.iv_name))
2109 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2110 ('%s' % disk.physical_id[1]))
2111 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2114 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2116 # New-style hypervisor/backend parameters
2118 config.add_section(constants.INISECT_HYP)
2119 for name, value in instance.hvparams.items():
2120 if name not in constants.HVC_GLOBALS:
2121 config.set(constants.INISECT_HYP, name, str(value))
2123 config.add_section(constants.INISECT_BEP)
2124 for name, value in instance.beparams.items():
2125 config.set(constants.INISECT_BEP, name, str(value))
2127 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2128 data=config.Dumps())
2129 shutil.rmtree(finaldestdir, ignore_errors=True)
2130 shutil.move(destdir, finaldestdir)
2133 def ExportInfo(dest):
2134 """Get export configuration information.
2137 @param dest: directory containing the export
2139 @rtype: L{objects.SerializableConfigParser}
2140 @return: a serializable config file containing the
2144 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2146 config = objects.SerializableConfigParser()
2149 if (not config.has_section(constants.INISECT_EXP) or
2150 not config.has_section(constants.INISECT_INS)):
2151 _Fail("Export info file doesn't have the required fields")
2153 return config.Dumps()
2156 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2157 """Import an os image into an instance.
2159 @type instance: L{objects.Instance}
2160 @param instance: instance to import the disks into
2161 @type src_node: string
2162 @param src_node: source node for the disk images
2163 @type src_images: list of string
2164 @param src_images: absolute paths of the disk images
2165 @type debug: integer
2166 @param debug: debug level, passed to the OS scripts
2167 @rtype: list of boolean
2168 @return: each boolean represent the success of importing the n-th disk
2171 inst_os = OSFromDisk(instance.os)
2172 import_env = OSEnvironment(instance, inst_os, debug)
2173 import_script = inst_os.import_script
2175 logfile = _InstanceLogName("import", instance.os, instance.name)
2176 if not os.path.exists(constants.LOG_OS_DIR):
2177 os.mkdir(constants.LOG_OS_DIR, 0750)
2180 impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2181 import_script, logfile)
2184 for idx, image in enumerate(src_images):
2186 destcmd = utils.BuildShellCmd('cat %s', image)
2187 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2188 constants.GANETI_RUNAS,
2190 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2191 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2192 import_env['IMPORT_INDEX'] = str(idx)
2193 result = utils.RunCmd(command, env=import_env)
2195 logging.error("Disk import command '%s' returned error: %s"
2196 " output: %s", command, result.fail_reason,
2198 final_result.append("error importing disk %d: %s, %s" %
2199 (idx, result.fail_reason, result.output[-100]))
2202 _Fail("; ".join(final_result), log=False)
2206 """Return a list of exports currently available on this machine.
2209 @return: list of the exports
2212 if os.path.isdir(constants.EXPORT_DIR):
2213 return utils.ListVisibleFiles(constants.EXPORT_DIR)
2215 _Fail("No exports directory")
2218 def RemoveExport(export):
2219 """Remove an existing export from the node.
2222 @param export: the name of the export to remove
2226 target = utils.PathJoin(constants.EXPORT_DIR, export)
2229 shutil.rmtree(target)
2230 except EnvironmentError, err:
2231 _Fail("Error while removing the export: %s", err, exc=True)
2234 def BlockdevRename(devlist):
2235 """Rename a list of block devices.
2237 @type devlist: list of tuples
2238 @param devlist: list of tuples of the form (disk,
2239 new_logical_id, new_physical_id); disk is an
2240 L{objects.Disk} object describing the current disk,
2241 and new logical_id/physical_id is the name we
2244 @return: True if all renames succeeded, False otherwise
2249 for disk, unique_id in devlist:
2250 dev = _RecursiveFindBD(disk)
2252 msgs.append("Can't find device %s in rename" % str(disk))
2256 old_rpath = dev.dev_path
2257 dev.Rename(unique_id)
2258 new_rpath = dev.dev_path
2259 if old_rpath != new_rpath:
2260 DevCacheManager.RemoveCache(old_rpath)
2261 # FIXME: we should add the new cache information here, like:
2262 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2263 # but we don't have the owner here - maybe parse from existing
2264 # cache? for now, we only lose lvm data when we rename, which
2265 # is less critical than DRBD or MD
2266 except errors.BlockDeviceError, err:
2267 msgs.append("Can't rename device '%s' to '%s': %s" %
2268 (dev, unique_id, err))
2269 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2272 _Fail("; ".join(msgs))
2275 def _TransformFileStorageDir(file_storage_dir):
2276 """Checks whether given file_storage_dir is valid.
2278 Checks wheter the given file_storage_dir is within the cluster-wide
2279 default file_storage_dir stored in SimpleStore. Only paths under that
2280 directory are allowed.
2282 @type file_storage_dir: str
2283 @param file_storage_dir: the path to check
2285 @return: the normalized path if valid, None otherwise
2288 if not constants.ENABLE_FILE_STORAGE:
2289 _Fail("File storage disabled at configure time")
2291 file_storage_dir = os.path.normpath(file_storage_dir)
2292 base_file_storage_dir = cfg.GetFileStorageDir()
2293 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2294 base_file_storage_dir):
2295 _Fail("File storage directory '%s' is not under base file"
2296 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2297 return file_storage_dir
2300 def CreateFileStorageDir(file_storage_dir):
2301 """Create file storage directory.
2303 @type file_storage_dir: str
2304 @param file_storage_dir: directory to create
2307 @return: tuple with first element a boolean indicating wheter dir
2308 creation was successful or not
2311 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2312 if os.path.exists(file_storage_dir):
2313 if not os.path.isdir(file_storage_dir):
2314 _Fail("Specified storage dir '%s' is not a directory",
2318 os.makedirs(file_storage_dir, 0750)
2319 except OSError, err:
2320 _Fail("Cannot create file storage directory '%s': %s",
2321 file_storage_dir, err, exc=True)
2324 def RemoveFileStorageDir(file_storage_dir):
2325 """Remove file storage directory.
2327 Remove it only if it's empty. If not log an error and return.
2329 @type file_storage_dir: str
2330 @param file_storage_dir: the directory we should cleanup
2331 @rtype: tuple (success,)
2332 @return: tuple of one element, C{success}, denoting
2333 whether the operation was successful
2336 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2337 if os.path.exists(file_storage_dir):
2338 if not os.path.isdir(file_storage_dir):
2339 _Fail("Specified Storage directory '%s' is not a directory",
2341 # deletes dir only if empty, otherwise we want to fail the rpc call
2343 os.rmdir(file_storage_dir)
2344 except OSError, err:
2345 _Fail("Cannot remove file storage directory '%s': %s",
2346 file_storage_dir, err)
2349 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2350 """Rename the file storage directory.
2352 @type old_file_storage_dir: str
2353 @param old_file_storage_dir: the current path
2354 @type new_file_storage_dir: str
2355 @param new_file_storage_dir: the name we should rename to
2356 @rtype: tuple (success,)
2357 @return: tuple of one element, C{success}, denoting
2358 whether the operation was successful
2361 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2362 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2363 if not os.path.exists(new_file_storage_dir):
2364 if os.path.isdir(old_file_storage_dir):
2366 os.rename(old_file_storage_dir, new_file_storage_dir)
2367 except OSError, err:
2368 _Fail("Cannot rename '%s' to '%s': %s",
2369 old_file_storage_dir, new_file_storage_dir, err)
2371 _Fail("Specified storage dir '%s' is not a directory",
2372 old_file_storage_dir)
2374 if os.path.exists(old_file_storage_dir):
2375 _Fail("Cannot rename '%s' to '%s': both locations exist",
2376 old_file_storage_dir, new_file_storage_dir)
2379 def _EnsureJobQueueFile(file_name):
2380 """Checks whether the given filename is in the queue directory.
2382 @type file_name: str
2383 @param file_name: the file name we should check
2385 @raises RPCFail: if the file is not valid
2388 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2389 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2392 _Fail("Passed job queue file '%s' does not belong to"
2393 " the queue directory '%s'", file_name, queue_dir)
2396 def JobQueueUpdate(file_name, content):
2397 """Updates a file in the queue directory.
2399 This is just a wrapper over L{utils.WriteFile}, with proper
2402 @type file_name: str
2403 @param file_name: the job file name
2405 @param content: the new job contents
2407 @return: the success of the operation
2410 _EnsureJobQueueFile(file_name)
2412 # Write and replace the file atomically
2413 utils.WriteFile(file_name, data=_Decompress(content))
2416 def JobQueueRename(old, new):
2417 """Renames a job queue file.
2419 This is just a wrapper over os.rename with proper checking.
2422 @param old: the old (actual) file name
2424 @param new: the desired file name
2426 @return: the success of the operation and payload
2429 _EnsureJobQueueFile(old)
2430 _EnsureJobQueueFile(new)
2432 utils.RenameFile(old, new, mkdir=True)
2435 def JobQueueSetDrainFlag(drain_flag):
2436 """Set the drain flag for the queue.
2438 This will set or unset the queue drain flag.
2440 @type drain_flag: boolean
2441 @param drain_flag: if True, will set the drain flag, otherwise reset it.
2443 @return: always True, None
2444 @warning: the function always returns True
2448 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2450 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2453 def BlockdevClose(instance_name, disks):
2454 """Closes the given block devices.
2456 This means they will be switched to secondary mode (in case of
2459 @param instance_name: if the argument is not empty, the symlinks
2460 of this instance will be removed
2461 @type disks: list of L{objects.Disk}
2462 @param disks: the list of disks to be closed
2463 @rtype: tuple (success, message)
2464 @return: a tuple of success and message, where success
2465 indicates the succes of the operation, and message
2466 which will contain the error details in case we
2472 rd = _RecursiveFindBD(cf)
2474 _Fail("Can't find device %s", cf)
2481 except errors.BlockDeviceError, err:
2482 msg.append(str(err))
2484 _Fail("Can't make devices secondary: %s", ",".join(msg))
2487 _RemoveBlockDevLinks(instance_name, disks)
2490 def ValidateHVParams(hvname, hvparams):
2491 """Validates the given hypervisor parameters.
2493 @type hvname: string
2494 @param hvname: the hypervisor name
2495 @type hvparams: dict
2496 @param hvparams: the hypervisor parameters to be validated
2501 hv_type = hypervisor.GetHypervisor(hvname)
2502 hv_type.ValidateParameters(hvparams)
2503 except errors.HypervisorError, err:
2504 _Fail(str(err), log=False)
2508 """Demotes the current node from master candidate role.
2511 # try to ensure we're not the master by mistake
2512 master, myself = ssconf.GetMasterAndMyself()
2513 if master == myself:
2514 _Fail("ssconf status shows I'm the master node, will not demote")
2516 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2517 if not result.failed:
2518 _Fail("The master daemon is running, will not demote")
2521 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2522 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2523 except EnvironmentError, err:
2524 if err.errno != errno.ENOENT:
2525 _Fail("Error while backing up cluster file: %s", err, exc=True)
2527 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2530 def _FindDisks(nodes_ip, disks):
2531 """Sets the physical ID on disks and returns the block devices.
2534 # set the correct physical ID
2535 my_name = utils.HostInfo().name
2537 cf.SetPhysicalID(my_name, nodes_ip)
2542 rd = _RecursiveFindBD(cf)
2544 _Fail("Can't find device %s", cf)
2549 def DrbdDisconnectNet(nodes_ip, disks):
2550 """Disconnects the network on a list of drbd devices.
2553 bdevs = _FindDisks(nodes_ip, disks)
2559 except errors.BlockDeviceError, err:
2560 _Fail("Can't change network configuration to standalone mode: %s",
2564 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2565 """Attaches the network on a list of drbd devices.
2568 bdevs = _FindDisks(nodes_ip, disks)
2571 for idx, rd in enumerate(bdevs):
2573 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2574 except EnvironmentError, err:
2575 _Fail("Can't create symlink: %s", err)
2576 # reconnect disks, switch to new master configuration and if
2577 # needed primary mode
2580 rd.AttachNet(multimaster)
2581 except errors.BlockDeviceError, err:
2582 _Fail("Can't change network configuration: %s", err)
2584 # wait until the disks are connected; we need to retry the re-attach
2585 # if the device becomes standalone, as this might happen if the one
2586 # node disconnects and reconnects in a different mode before the
2587 # other node reconnects; in this case, one or both of the nodes will
2588 # decide it has wrong configuration and switch to standalone
2591 all_connected = True
2594 stats = rd.GetProcStatus()
2596 all_connected = (all_connected and
2597 (stats.is_connected or stats.is_in_resync))
2599 if stats.is_standalone:
2600 # peer had different config info and this node became
2601 # standalone, even though this should not happen with the
2602 # new staged way of changing disk configs
2604 rd.AttachNet(multimaster)
2605 except errors.BlockDeviceError, err:
2606 _Fail("Can't change network configuration: %s", err)
2608 if not all_connected:
2609 raise utils.RetryAgain()
2612 # Start with a delay of 100 miliseconds and go up to 5 seconds
2613 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2614 except utils.RetryTimeout:
2615 _Fail("Timeout in disk reconnecting")
2618 # change to primary mode
2622 except errors.BlockDeviceError, err:
2623 _Fail("Can't change to primary mode: %s", err)
2626 def DrbdWaitSync(nodes_ip, disks):
2627 """Wait until DRBDs have synchronized.
2631 stats = rd.GetProcStatus()
2632 if not (stats.is_connected or stats.is_in_resync):
2633 raise utils.RetryAgain()
2636 bdevs = _FindDisks(nodes_ip, disks)
2642 # poll each second for 15 seconds
2643 stats = utils.Retry(_helper, 1, 15, args=[rd])
2644 except utils.RetryTimeout:
2645 stats = rd.GetProcStatus()
2647 if not (stats.is_connected or stats.is_in_resync):
2648 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2649 alldone = alldone and (not stats.is_in_resync)
2650 if stats.sync_percent is not None:
2651 min_resync = min(min_resync, stats.sync_percent)
2653 return (alldone, min_resync)
2656 def PowercycleNode(hypervisor_type):
2657 """Hard-powercycle the node.
2659 Because we need to return first, and schedule the powercycle in the
2660 background, we won't be able to report failures nicely.
2663 hyper = hypervisor.GetHypervisor(hypervisor_type)
2667 # if we can't fork, we'll pretend that we're in the child process
2670 return "Reboot scheduled in 5 seconds"
2672 hyper.PowercycleNode()
2675 class HooksRunner(object):
2678 This class is instantiated on the node side (ganeti-noded) and not
2682 def __init__(self, hooks_base_dir=None):
2683 """Constructor for hooks runner.
2685 @type hooks_base_dir: str or None
2686 @param hooks_base_dir: if not None, this overrides the
2687 L{constants.HOOKS_BASE_DIR} (useful for unittests)
2690 if hooks_base_dir is None:
2691 hooks_base_dir = constants.HOOKS_BASE_DIR
2692 # yeah, _BASE_DIR is not valid for attributes, we use it like a
2694 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2696 def RunHooks(self, hpath, phase, env):
2697 """Run the scripts in the hooks directory.
2700 @param hpath: the path to the hooks directory which
2703 @param phase: either L{constants.HOOKS_PHASE_PRE} or
2704 L{constants.HOOKS_PHASE_POST}
2706 @param env: dictionary with the environment for the hook
2708 @return: list of 3-element tuples:
2710 - script result, either L{constants.HKR_SUCCESS} or
2711 L{constants.HKR_FAIL}
2712 - output of the script
2714 @raise errors.ProgrammerError: for invalid input
2718 if phase == constants.HOOKS_PHASE_PRE:
2720 elif phase == constants.HOOKS_PHASE_POST:
2723 _Fail("Unknown hooks phase '%s'", phase)
2726 subdir = "%s-%s.d" % (hpath, suffix)
2727 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2731 if not os.path.isdir(dir_name):
2732 # for non-existing/non-dirs, we simply exit instead of logging a
2733 # warning at every operation
2736 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2738 for (relname, relstatus, runresult) in runparts_results:
2739 if relstatus == constants.RUNPARTS_SKIP:
2740 rrval = constants.HKR_SKIP
2742 elif relstatus == constants.RUNPARTS_ERR:
2743 rrval = constants.HKR_FAIL
2744 output = "Hook script execution error: %s" % runresult
2745 elif relstatus == constants.RUNPARTS_RUN:
2746 if runresult.failed:
2747 rrval = constants.HKR_FAIL
2749 rrval = constants.HKR_SUCCESS
2750 output = utils.SafeEncode(runresult.output.strip())
2751 results.append(("%s/%s" % (subdir, relname), rrval, output))
2756 class IAllocatorRunner(object):
2757 """IAllocator runner.
2759 This class is instantiated on the node side (ganeti-noded) and not on
2764 def Run(name, idata):
2765 """Run an iallocator script.
2768 @param name: the iallocator script name
2770 @param idata: the allocator input data
2773 @return: two element tuple of:
2775 - either error message or stdout of allocator (for success)
2778 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2780 if alloc_script is None:
2781 _Fail("iallocator module '%s' not found in the search path", name)
2783 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2787 result = utils.RunCmd([alloc_script, fin_name])
2789 _Fail("iallocator module '%s' failed: %s, output '%s'",
2790 name, result.fail_reason, result.output)
2794 return result.stdout
2797 class DevCacheManager(object):
2798 """Simple class for managing a cache of block device information.
2801 _DEV_PREFIX = "/dev/"
2802 _ROOT_DIR = constants.BDEV_CACHE_DIR
2805 def _ConvertPath(cls, dev_path):
2806 """Converts a /dev/name path to the cache file name.
2808 This replaces slashes with underscores and strips the /dev
2809 prefix. It then returns the full path to the cache file.
2812 @param dev_path: the C{/dev/} path name
2814 @return: the converted path name
2817 if dev_path.startswith(cls._DEV_PREFIX):
2818 dev_path = dev_path[len(cls._DEV_PREFIX):]
2819 dev_path = dev_path.replace("/", "_")
2820 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
2824 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2825 """Updates the cache information for a given device.
2828 @param dev_path: the pathname of the device
2830 @param owner: the owner (instance name) of the device
2831 @type on_primary: bool
2832 @param on_primary: whether this is the primary
2835 @param iv_name: the instance-visible name of the
2836 device, as in objects.Disk.iv_name
2841 if dev_path is None:
2842 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2844 fpath = cls._ConvertPath(dev_path)
2850 iv_name = "not_visible"
2851 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2853 utils.WriteFile(fpath, data=fdata)
2854 except EnvironmentError, err:
2855 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2858 def RemoveCache(cls, dev_path):
2859 """Remove data for a dev_path.
2861 This is just a wrapper over L{utils.RemoveFile} with a converted
2862 path name and logging.
2865 @param dev_path: the pathname of the device
2870 if dev_path is None:
2871 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2873 fpath = cls._ConvertPath(dev_path)
2875 utils.RemoveFile(fpath)
2876 except EnvironmentError, err:
2877 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)