4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
51 from ganeti import errors
52 from ganeti import utils
53 from ganeti import ssh
54 from ganeti import hypervisor
55 from ganeti import constants
56 from ganeti import bdev
57 from ganeti import objects
58 from ganeti import ssconf
61 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62 _ALLOWED_CLEAN_DIRS = frozenset([
64 constants.JOB_QUEUE_ARCHIVE_DIR,
69 class RPCFail(Exception):
70 """Class denoting RPC failure.
72 Its argument is the error message.
77 def _Fail(msg, *args, **kwargs):
78 """Log an error and the raise an RPCFail exception.
80 This exception is then handled specially in the ganeti daemon and
81 turned into a 'failed' return type. As such, this function is a
82 useful shortcut for logging the error and returning it to the master
86 @param msg: the text of the exception
92 if "log" not in kwargs or kwargs["log"]: # if we should log this error
93 if "exc" in kwargs and kwargs["exc"]:
94 logging.exception(msg)
101 """Simple wrapper to return a SimpleStore.
103 @rtype: L{ssconf.SimpleStore}
104 @return: a SimpleStore instance
107 return ssconf.SimpleStore()
110 def _GetSshRunner(cluster_name):
111 """Simple wrapper to return an SshRunner.
113 @type cluster_name: str
114 @param cluster_name: the cluster name, which is needed
115 by the SshRunner constructor
116 @rtype: L{ssh.SshRunner}
117 @return: an SshRunner instance
120 return ssh.SshRunner(cluster_name)
123 def _Decompress(data):
124 """Unpacks data compressed by the RPC client.
126 @type data: list or tuple
127 @param data: Data sent by RPC client
129 @return: Decompressed data
132 assert isinstance(data, (list, tuple))
133 assert len(data) == 2
134 (encoding, content) = data
135 if encoding == constants.RPC_ENCODING_NONE:
137 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
138 return zlib.decompress(base64.b64decode(content))
140 raise AssertionError("Unknown data encoding")
143 def _CleanDirectory(path, exclude=None):
144 """Removes all regular files in a directory.
147 @param path: the directory to clean
149 @param exclude: list of files to be excluded, defaults
153 if path not in _ALLOWED_CLEAN_DIRS:
154 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
157 if not os.path.isdir(path):
162 # Normalize excluded paths
163 exclude = [os.path.normpath(i) for i in exclude]
165 for rel_name in utils.ListVisibleFiles(path):
166 full_name = utils.PathJoin(path, rel_name)
167 if full_name in exclude:
169 if os.path.isfile(full_name) and not os.path.islink(full_name):
170 utils.RemoveFile(full_name)
173 def _BuildUploadFileList():
174 """Build the list of allowed upload files.
176 This is abstracted so that it's built only once at module import time.
179 allowed_files = set([
180 constants.CLUSTER_CONF_FILE,
182 constants.SSH_KNOWN_HOSTS_FILE,
183 constants.VNC_PASSWORD_FILE,
184 constants.RAPI_CERT_FILE,
185 constants.RAPI_USERS_FILE,
186 constants.CONFD_HMAC_KEY,
189 for hv_name in constants.HYPER_TYPES:
190 hv_class = hypervisor.GetHypervisorClass(hv_name)
191 allowed_files.update(hv_class.GetAncillaryFiles())
193 return frozenset(allowed_files)
196 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
200 """Removes job queue files and archived jobs.
206 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
207 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
211 """Returns master information.
213 This is an utility function to compute master information, either
214 for consumption here or from the node daemon.
217 @return: master_netdev, master_ip, master_name
218 @raise RPCFail: in case of errors
223 master_netdev = cfg.GetMasterNetdev()
224 master_ip = cfg.GetMasterIP()
225 master_node = cfg.GetMasterNode()
226 except errors.ConfigurationError, err:
227 _Fail("Cluster configuration incomplete: %s", err, exc=True)
228 return (master_netdev, master_ip, master_node)
231 def StartMaster(start_daemons, no_voting):
232 """Activate local node as master node.
234 The function will always try activate the IP address of the master
235 (unless someone else has it). It will also start the master daemons,
236 based on the start_daemons parameter.
238 @type start_daemons: boolean
239 @param start_daemons: whether to also start the master
240 daemons (ganeti-masterd and ganeti-rapi)
241 @type no_voting: boolean
242 @param no_voting: whether to start ganeti-masterd without a node vote
243 (if start_daemons is True), but still non-interactively
247 # GetMasterInfo will raise an exception if not able to return data
248 master_netdev, master_ip, _ = GetMasterInfo()
251 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
252 if utils.OwnIpAddress(master_ip):
253 # we already have the ip:
254 logging.debug("Master IP already configured, doing nothing")
256 msg = "Someone else has the master ip, not activating"
260 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
261 "dev", master_netdev, "label",
262 "%s:0" % master_netdev])
264 msg = "Can't activate master IP: %s" % result.output
268 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
269 "-s", master_ip, master_ip])
270 # we'll ignore the exit code of arping
272 # and now start the master and rapi daemons
275 masterd_args = "--no-voting --yes-do-it"
280 "EXTRA_MASTERD_ARGS": masterd_args,
283 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
285 msg = "Can't start Ganeti master: %s" % result.output
290 _Fail("; ".join(err_msgs))
293 def StopMaster(stop_daemons):
294 """Deactivate this node as master.
296 The function will always try to deactivate the IP address of the
297 master. It will also stop the master daemons depending on the
298 stop_daemons parameter.
300 @type stop_daemons: boolean
301 @param stop_daemons: whether to also stop the master daemons
302 (ganeti-masterd and ganeti-rapi)
306 # TODO: log and report back to the caller the error failures; we
307 # need to decide in which case we fail the RPC for this
309 # GetMasterInfo will raise an exception if not able to return data
310 master_netdev, master_ip, _ = GetMasterInfo()
312 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
313 "dev", master_netdev])
315 logging.error("Can't remove the master IP, error: %s", result.output)
316 # but otherwise ignore the failure
319 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
321 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
323 result.cmd, result.exit_code, result.output)
326 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
327 """Joins this node to the cluster.
329 This does the following:
330 - updates the hostkeys of the machine (rsa and dsa)
331 - adds the ssh private key to the user
332 - adds the ssh public key to the users' authorized_keys file
335 @param dsa: the DSA private key to write
337 @param dsapub: the DSA public key to write
339 @param rsa: the RSA private key to write
341 @param rsapub: the RSA public key to write
343 @param sshkey: the SSH private key to write
345 @param sshpub: the SSH public key to write
347 @return: the success of the operation
350 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
351 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
352 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
353 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
354 for name, content, mode in sshd_keys:
355 utils.WriteFile(name, data=content, mode=mode)
358 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
360 except errors.OpExecError, err:
361 _Fail("Error while processing user ssh files: %s", err, exc=True)
363 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
364 utils.WriteFile(name, data=content, mode=0600)
366 utils.AddAuthorizedKey(auth_keys, sshpub)
368 result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
370 _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
371 result.cmd, result.exit_code, result.output)
374 def LeaveCluster(modify_ssh_setup):
375 """Cleans up and remove the current node.
377 This function cleans up and prepares the current node to be removed
380 If processing is successful, then it raises an
381 L{errors.QuitGanetiException} which is used as a special case to
382 shutdown the node daemon.
384 @param modify_ssh_setup: boolean
387 _CleanDirectory(constants.DATA_DIR)
392 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
394 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
396 utils.RemoveFile(priv_key)
397 utils.RemoveFile(pub_key)
398 except errors.OpExecError:
399 logging.exception("Error while processing ssh files")
402 utils.RemoveFile(constants.CONFD_HMAC_KEY)
403 utils.RemoveFile(constants.RAPI_CERT_FILE)
404 utils.RemoveFile(constants.NODED_CERT_FILE)
405 except: # pylint: disable-msg=W0702
406 logging.exception("Error while removing cluster secrets")
408 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
410 logging.error("Command %s failed with exitcode %s and error %s",
411 result.cmd, result.exit_code, result.output)
413 # Raise a custom exception (handled in ganeti-noded)
414 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
417 def GetNodeInfo(vgname, hypervisor_type):
418 """Gives back a hash with different information about the node.
420 @type vgname: C{string}
421 @param vgname: the name of the volume group to ask for disk space information
422 @type hypervisor_type: C{str}
423 @param hypervisor_type: the name of the hypervisor to ask for
426 @return: dictionary with the following keys:
427 - vg_size is the size of the configured volume group in MiB
428 - vg_free is the free size of the volume group in MiB
429 - memory_dom0 is the memory allocated for domain0 in MiB
430 - memory_free is the currently available (free) ram in MiB
431 - memory_total is the total number of ram in MiB
435 vginfo = _GetVGInfo(vgname)
436 outputarray['vg_size'] = vginfo['vg_size']
437 outputarray['vg_free'] = vginfo['vg_free']
439 hyper = hypervisor.GetHypervisor(hypervisor_type)
440 hyp_info = hyper.GetNodeInfo()
441 if hyp_info is not None:
442 outputarray.update(hyp_info)
444 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
449 def VerifyNode(what, cluster_name):
450 """Verify the status of the local node.
452 Based on the input L{what} parameter, various checks are done on the
455 If the I{filelist} key is present, this list of
456 files is checksummed and the file/checksum pairs are returned.
458 If the I{nodelist} key is present, we check that we have
459 connectivity via ssh with the target nodes (and check the hostname
462 If the I{node-net-test} key is present, we check that we have
463 connectivity to the given nodes via both primary IP and, if
464 applicable, secondary IPs.
467 @param what: a dictionary of things to check:
468 - filelist: list of files for which to compute checksums
469 - nodelist: list of nodes we should check ssh communication with
470 - node-net-test: list of nodes we should check node daemon port
472 - hypervisor: list with hypervisors to run the verify for
474 @return: a dictionary with the same keys as the input dict, and
475 values representing the result of the checks
479 my_name = utils.HostInfo().name
480 port = utils.GetDaemonPort(constants.NODED)
482 if constants.NV_HYPERVISOR in what:
483 result[constants.NV_HYPERVISOR] = tmp = {}
484 for hv_name in what[constants.NV_HYPERVISOR]:
486 val = hypervisor.GetHypervisor(hv_name).Verify()
487 except errors.HypervisorError, err:
488 val = "Error while checking hypervisor: %s" % str(err)
491 if constants.NV_FILELIST in what:
492 result[constants.NV_FILELIST] = utils.FingerprintFiles(
493 what[constants.NV_FILELIST])
495 if constants.NV_NODELIST in what:
496 result[constants.NV_NODELIST] = tmp = {}
497 random.shuffle(what[constants.NV_NODELIST])
498 for node in what[constants.NV_NODELIST]:
499 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
503 if constants.NV_NODENETTEST in what:
504 result[constants.NV_NODENETTEST] = tmp = {}
505 my_pip = my_sip = None
506 for name, pip, sip in what[constants.NV_NODENETTEST]:
512 tmp[my_name] = ("Can't find my own primary/secondary IP"
515 for name, pip, sip in what[constants.NV_NODENETTEST]:
517 if not utils.TcpPing(pip, port, source=my_pip):
518 fail.append("primary")
520 if not utils.TcpPing(sip, port, source=my_sip):
521 fail.append("secondary")
523 tmp[name] = ("failure using the %s interface(s)" %
526 if constants.NV_MASTERIP in what:
527 # FIXME: add checks on incoming data structures (here and in the
528 # rest of the function)
529 master_name, master_ip = what[constants.NV_MASTERIP]
530 if master_name == my_name:
531 source = constants.LOCALHOST_IP_ADDRESS
534 result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
537 if constants.NV_LVLIST in what:
539 val = GetVolumeList(what[constants.NV_LVLIST])
542 result[constants.NV_LVLIST] = val
544 if constants.NV_INSTANCELIST in what:
545 # GetInstanceList can fail
547 val = GetInstanceList(what[constants.NV_INSTANCELIST])
550 result[constants.NV_INSTANCELIST] = val
552 if constants.NV_VGLIST in what:
553 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
555 if constants.NV_PVLIST in what:
556 result[constants.NV_PVLIST] = \
557 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
558 filter_allocatable=False)
560 if constants.NV_VERSION in what:
561 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
562 constants.RELEASE_VERSION)
564 if constants.NV_HVINFO in what:
565 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
566 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
568 if constants.NV_DRBDLIST in what:
570 used_minors = bdev.DRBD8.GetUsedDevs().keys()
571 except errors.BlockDeviceError, err:
572 logging.warning("Can't get used minors list", exc_info=True)
573 used_minors = str(err)
574 result[constants.NV_DRBDLIST] = used_minors
576 if constants.NV_NODESETUP in what:
577 result[constants.NV_NODESETUP] = tmpr = []
578 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
579 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
580 " under /sys, missing required directories /sys/block"
581 " and /sys/class/net")
582 if (not os.path.isdir("/proc/sys") or
583 not os.path.isfile("/proc/sysrq-trigger")):
584 tmpr.append("The procfs filesystem doesn't seem to be mounted"
585 " under /proc, missing required directory /proc/sys and"
586 " the file /proc/sysrq-trigger")
588 if constants.NV_TIME in what:
589 result[constants.NV_TIME] = utils.SplitTime(time.time())
594 def GetVolumeList(vg_name):
595 """Compute list of logical volumes and their size.
598 @param vg_name: the volume group whose LVs we should list
601 dictionary of all partions (key) with value being a tuple of
602 their size (in MiB), inactive and online status::
604 {'test1': ('20.06', True, True)}
606 in case of errors, a string is returned with the error
612 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
613 "--separator=%s" % sep,
614 "-olv_name,lv_size,lv_attr", vg_name])
616 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
618 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
619 for line in result.stdout.splitlines():
621 match = valid_line_re.match(line)
623 logging.error("Invalid line returned from lvs output: '%s'", line)
625 name, size, attr = match.groups()
626 inactive = attr[4] == '-'
627 online = attr[5] == 'o'
628 virtual = attr[0] == 'v'
630 # we don't want to report such volumes as existing, since they
631 # don't really hold data
633 lvs[name] = (size, inactive, online)
638 def ListVolumeGroups():
639 """List the volume groups and their size.
642 @return: dictionary with keys volume name and values the
646 return utils.ListVolumeGroups()
650 """List all volumes on this node.
654 A list of dictionaries, each having four keys:
655 - name: the logical volume name,
656 - size: the size of the logical volume
657 - dev: the physical device on which the LV lives
658 - vg: the volume group to which it belongs
660 In case of errors, we return an empty list and log the
663 Note that since a logical volume can live on multiple physical
664 volumes, the resulting list might include a logical volume
668 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
670 "--options=lv_name,lv_size,devices,vg_name"])
672 _Fail("Failed to list logical volumes, lvs output: %s",
676 return dev.split('(')[0]
679 return [parse_dev(x) for x in dev.split(",")]
682 line = [v.strip() for v in line]
683 return [{'name': line[0], 'size': line[1],
684 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
687 for line in result.stdout.splitlines():
688 if line.count('|') >= 3:
689 all_devs.extend(map_line(line.split('|')))
691 logging.warning("Strange line in the output from lvs: '%s'", line)
695 def BridgesExist(bridges_list):
696 """Check if a list of bridges exist on the current node.
699 @return: C{True} if all of them exist, C{False} otherwise
703 for bridge in bridges_list:
704 if not utils.BridgeExists(bridge):
705 missing.append(bridge)
708 _Fail("Missing bridges %s", utils.CommaJoin(missing))
711 def GetInstanceList(hypervisor_list):
712 """Provides a list of instances.
714 @type hypervisor_list: list
715 @param hypervisor_list: the list of hypervisors to query information
718 @return: a list of all running instances on the current node
719 - instance1.example.com
720 - instance2.example.com
724 for hname in hypervisor_list:
726 names = hypervisor.GetHypervisor(hname).ListInstances()
727 results.extend(names)
728 except errors.HypervisorError, err:
729 _Fail("Error enumerating instances (hypervisor %s): %s",
730 hname, err, exc=True)
735 def GetInstanceInfo(instance, hname):
736 """Gives back the information about an instance as a dictionary.
738 @type instance: string
739 @param instance: the instance name
741 @param hname: the hypervisor type of the instance
744 @return: dictionary with the following keys:
745 - memory: memory size of instance (int)
746 - state: xen state of instance (string)
747 - time: cpu time of instance (float)
752 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
753 if iinfo is not None:
754 output['memory'] = iinfo[2]
755 output['state'] = iinfo[4]
756 output['time'] = iinfo[5]
761 def GetInstanceMigratable(instance):
762 """Gives whether an instance can be migrated.
764 @type instance: L{objects.Instance}
765 @param instance: object representing the instance to be checked.
768 @return: tuple of (result, description) where:
769 - result: whether the instance can be migrated or not
770 - description: a description of the issue, if relevant
773 hyper = hypervisor.GetHypervisor(instance.hypervisor)
774 iname = instance.name
775 if iname not in hyper.ListInstances():
776 _Fail("Instance %s is not running", iname)
778 for idx in range(len(instance.disks)):
779 link_name = _GetBlockDevSymlinkPath(iname, idx)
780 if not os.path.islink(link_name):
781 _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
784 def GetAllInstancesInfo(hypervisor_list):
785 """Gather data about all instances.
787 This is the equivalent of L{GetInstanceInfo}, except that it
788 computes data for all instances at once, thus being faster if one
789 needs data about more than one instance.
791 @type hypervisor_list: list
792 @param hypervisor_list: list of hypervisors to query for instance data
795 @return: dictionary of instance: data, with data having the following keys:
796 - memory: memory size of instance (int)
797 - state: xen state of instance (string)
798 - time: cpu time of instance (float)
799 - vcpus: the number of vcpus
804 for hname in hypervisor_list:
805 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
807 for name, _, memory, vcpus, state, times in iinfo:
815 # we only check static parameters, like memory and vcpus,
816 # and not state and time which can change between the
817 # invocations of the different hypervisors
818 for key in 'memory', 'vcpus':
819 if value[key] != output[name][key]:
820 _Fail("Instance %s is running twice"
821 " with different parameters", name)
827 def _InstanceLogName(kind, os_name, instance):
828 """Compute the OS log filename for a given instance and operation.
830 The instance name and os name are passed in as strings since not all
831 operations have these as part of an instance object.
834 @param kind: the operation type (e.g. add, import, etc.)
835 @type os_name: string
836 @param os_name: the os name
837 @type instance: string
838 @param instance: the name of the instance being imported/added/etc.
841 base = ("%s-%s-%s-%s.log" %
842 (kind, os_name, instance, utils.TimestampForFilename()))
843 return utils.PathJoin(constants.LOG_OS_DIR, base)
846 def InstanceOsAdd(instance, reinstall, debug):
847 """Add an OS to an instance.
849 @type instance: L{objects.Instance}
850 @param instance: Instance whose OS is to be installed
851 @type reinstall: boolean
852 @param reinstall: whether this is an instance reinstall
854 @param debug: debug level, passed to the OS scripts
858 inst_os = OSFromDisk(instance.os)
860 create_env = OSEnvironment(instance, inst_os, debug)
862 create_env['INSTANCE_REINSTALL'] = "1"
864 logfile = _InstanceLogName("add", instance.os, instance.name)
866 result = utils.RunCmd([inst_os.create_script], env=create_env,
867 cwd=inst_os.path, output=logfile,)
869 logging.error("os create command '%s' returned error: %s, logfile: %s,"
870 " output: %s", result.cmd, result.fail_reason, logfile,
872 lines = [utils.SafeEncode(val)
873 for val in utils.TailFile(logfile, lines=20)]
874 _Fail("OS create script failed (%s), last lines in the"
875 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
878 def RunRenameInstance(instance, old_name, debug):
879 """Run the OS rename script for an instance.
881 @type instance: L{objects.Instance}
882 @param instance: Instance whose OS is to be installed
883 @type old_name: string
884 @param old_name: previous instance name
886 @param debug: debug level, passed to the OS scripts
888 @return: the success of the operation
891 inst_os = OSFromDisk(instance.os)
893 rename_env = OSEnvironment(instance, inst_os, debug)
894 rename_env['OLD_INSTANCE_NAME'] = old_name
896 logfile = _InstanceLogName("rename", instance.os,
897 "%s-%s" % (old_name, instance.name))
899 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
900 cwd=inst_os.path, output=logfile)
903 logging.error("os create command '%s' returned error: %s output: %s",
904 result.cmd, result.fail_reason, result.output)
905 lines = [utils.SafeEncode(val)
906 for val in utils.TailFile(logfile, lines=20)]
907 _Fail("OS rename script failed (%s), last lines in the"
908 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
911 def _GetVGInfo(vg_name):
912 """Get information about the volume group.
915 @param vg_name: the volume group which we query
918 A dictionary with the following keys:
919 - C{vg_size} is the total size of the volume group in MiB
920 - C{vg_free} is the free size of the volume group in MiB
921 - C{pv_count} are the number of physical disks in that VG
923 If an error occurs during gathering of data, we return the same dict
924 with keys all set to None.
927 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
929 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
930 "--nosuffix", "--units=m", "--separator=:", vg_name])
933 logging.error("volume group %s not present", vg_name)
935 valarr = retval.stdout.strip().rstrip(':').split(':')
939 "vg_size": int(round(float(valarr[0]), 0)),
940 "vg_free": int(round(float(valarr[1]), 0)),
941 "pv_count": int(valarr[2]),
943 except (TypeError, ValueError), err:
944 logging.exception("Fail to parse vgs output: %s", err)
946 logging.error("vgs output has the wrong number of fields (expected"
947 " three): %s", str(valarr))
951 def _GetBlockDevSymlinkPath(instance_name, idx):
952 return utils.PathJoin(constants.DISK_LINKS_DIR,
953 "%s:%d" % (instance_name, idx))
956 def _SymlinkBlockDev(instance_name, device_path, idx):
957 """Set up symlinks to a instance's block device.
959 This is an auxiliary function run when an instance is start (on the primary
960 node) or when an instance is migrated (on the target node).
963 @param instance_name: the name of the target instance
964 @param device_path: path of the physical block device, on the node
965 @param idx: the disk index
966 @return: absolute path to the disk's symlink
969 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
971 os.symlink(device_path, link_name)
973 if err.errno == errno.EEXIST:
974 if (not os.path.islink(link_name) or
975 os.readlink(link_name) != device_path):
977 os.symlink(device_path, link_name)
984 def _RemoveBlockDevLinks(instance_name, disks):
985 """Remove the block device symlinks belonging to the given instance.
988 for idx, _ in enumerate(disks):
989 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
990 if os.path.islink(link_name):
994 logging.exception("Can't remove symlink '%s'", link_name)
997 def _GatherAndLinkBlockDevs(instance):
998 """Set up an instance's block device(s).
1000 This is run on the primary node at instance startup. The block
1001 devices must be already assembled.
1003 @type instance: L{objects.Instance}
1004 @param instance: the instance whose disks we shoul assemble
1006 @return: list of (disk_object, device_path)
1010 for idx, disk in enumerate(instance.disks):
1011 device = _RecursiveFindBD(disk)
1013 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1017 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1019 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1022 block_devices.append((disk, link_name))
1024 return block_devices
1027 def StartInstance(instance):
1028 """Start an instance.
1030 @type instance: L{objects.Instance}
1031 @param instance: the instance object
1035 running_instances = GetInstanceList([instance.hypervisor])
1037 if instance.name in running_instances:
1038 logging.info("Instance %s already running, not starting", instance.name)
1042 block_devices = _GatherAndLinkBlockDevs(instance)
1043 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1044 hyper.StartInstance(instance, block_devices)
1045 except errors.BlockDeviceError, err:
1046 _Fail("Block device error: %s", err, exc=True)
1047 except errors.HypervisorError, err:
1048 _RemoveBlockDevLinks(instance.name, instance.disks)
1049 _Fail("Hypervisor error: %s", err, exc=True)
1052 def InstanceShutdown(instance, timeout):
1053 """Shut an instance down.
1055 @note: this functions uses polling with a hardcoded timeout.
1057 @type instance: L{objects.Instance}
1058 @param instance: the instance object
1059 @type timeout: integer
1060 @param timeout: maximum timeout for soft shutdown
1064 hv_name = instance.hypervisor
1065 hyper = hypervisor.GetHypervisor(hv_name)
1066 iname = instance.name
1068 if instance.name not in hyper.ListInstances():
1069 logging.info("Instance %s not running, doing nothing", iname)
1074 self.tried_once = False
1077 if iname not in hyper.ListInstances():
1081 hyper.StopInstance(instance, retry=self.tried_once)
1082 except errors.HypervisorError, err:
1083 if iname not in hyper.ListInstances():
1084 # if the instance is no longer existing, consider this a
1085 # success and go to cleanup
1088 _Fail("Failed to stop instance %s: %s", iname, err)
1090 self.tried_once = True
1092 raise utils.RetryAgain()
1095 utils.Retry(_TryShutdown(), 5, timeout)
1096 except utils.RetryTimeout:
1097 # the shutdown did not succeed
1098 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1101 hyper.StopInstance(instance, force=True)
1102 except errors.HypervisorError, err:
1103 if iname in hyper.ListInstances():
1104 # only raise an error if the instance still exists, otherwise
1105 # the error could simply be "instance ... unknown"!
1106 _Fail("Failed to force stop instance %s: %s", iname, err)
1110 if iname in hyper.ListInstances():
1111 _Fail("Could not shutdown instance %s even by destroy", iname)
1114 hyper.CleanupInstance(instance.name)
1115 except errors.HypervisorError, err:
1116 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1118 _RemoveBlockDevLinks(iname, instance.disks)
1121 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1122 """Reboot an instance.
1124 @type instance: L{objects.Instance}
1125 @param instance: the instance object to reboot
1126 @type reboot_type: str
1127 @param reboot_type: the type of reboot, one the following
1129 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1130 instance OS, do not recreate the VM
1131 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1132 restart the VM (at the hypervisor level)
1133 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1134 not accepted here, since that mode is handled differently, in
1135 cmdlib, and translates into full stop and start of the
1136 instance (instead of a call_instance_reboot RPC)
1137 @type shutdown_timeout: integer
1138 @param shutdown_timeout: maximum timeout for soft shutdown
1142 running_instances = GetInstanceList([instance.hypervisor])
1144 if instance.name not in running_instances:
1145 _Fail("Cannot reboot instance %s that is not running", instance.name)
1147 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1148 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1150 hyper.RebootInstance(instance)
1151 except errors.HypervisorError, err:
1152 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1153 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1155 InstanceShutdown(instance, shutdown_timeout)
1156 return StartInstance(instance)
1157 except errors.HypervisorError, err:
1158 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1160 _Fail("Invalid reboot_type received: %s", reboot_type)
1163 def MigrationInfo(instance):
1164 """Gather information about an instance to be migrated.
1166 @type instance: L{objects.Instance}
1167 @param instance: the instance definition
1170 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1172 info = hyper.MigrationInfo(instance)
1173 except errors.HypervisorError, err:
1174 _Fail("Failed to fetch migration information: %s", err, exc=True)
1178 def AcceptInstance(instance, info, target):
1179 """Prepare the node to accept an instance.
1181 @type instance: L{objects.Instance}
1182 @param instance: the instance definition
1183 @type info: string/data (opaque)
1184 @param info: migration information, from the source node
1185 @type target: string
1186 @param target: target host (usually ip), on this node
1189 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1191 hyper.AcceptInstance(instance, info, target)
1192 except errors.HypervisorError, err:
1193 _Fail("Failed to accept instance: %s", err, exc=True)
1196 def FinalizeMigration(instance, info, success):
1197 """Finalize any preparation to accept an instance.
1199 @type instance: L{objects.Instance}
1200 @param instance: the instance definition
1201 @type info: string/data (opaque)
1202 @param info: migration information, from the source node
1203 @type success: boolean
1204 @param success: whether the migration was a success or a failure
1207 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1209 hyper.FinalizeMigration(instance, info, success)
1210 except errors.HypervisorError, err:
1211 _Fail("Failed to finalize migration: %s", err, exc=True)
1214 def MigrateInstance(instance, target, live):
1215 """Migrates an instance to another node.
1217 @type instance: L{objects.Instance}
1218 @param instance: the instance definition
1219 @type target: string
1220 @param target: the target node name
1222 @param live: whether the migration should be done live or not (the
1223 interpretation of this parameter is left to the hypervisor)
1225 @return: a tuple of (success, msg) where:
1226 - succes is a boolean denoting the success/failure of the operation
1227 - msg is a string with details in case of failure
1230 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1233 hyper.MigrateInstance(instance, target, live)
1234 except errors.HypervisorError, err:
1235 _Fail("Failed to migrate instance: %s", err, exc=True)
1238 def BlockdevCreate(disk, size, owner, on_primary, info):
1239 """Creates a block device for an instance.
1241 @type disk: L{objects.Disk}
1242 @param disk: the object describing the disk we should create
1244 @param size: the size of the physical underlying device, in MiB
1246 @param owner: the name of the instance for which disk is created,
1247 used for device cache data
1248 @type on_primary: boolean
1249 @param on_primary: indicates if it is the primary node or not
1251 @param info: string that will be sent to the physical device
1252 creation, used for example to set (LVM) tags on LVs
1254 @return: the new unique_id of the device (this can sometime be
1255 computed only after creation), or None. On secondary nodes,
1256 it's not required to return anything.
1259 # TODO: remove the obsolete 'size' argument
1260 # pylint: disable-msg=W0613
1263 for child in disk.children:
1265 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1266 except errors.BlockDeviceError, err:
1267 _Fail("Can't assemble device %s: %s", child, err)
1268 if on_primary or disk.AssembleOnSecondary():
1269 # we need the children open in case the device itself has to
1272 # pylint: disable-msg=E1103
1274 except errors.BlockDeviceError, err:
1275 _Fail("Can't make child '%s' read-write: %s", child, err)
1279 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1280 except errors.BlockDeviceError, err:
1281 _Fail("Can't create block device: %s", err)
1283 if on_primary or disk.AssembleOnSecondary():
1286 except errors.BlockDeviceError, err:
1287 _Fail("Can't assemble device after creation, unusual event: %s", err)
1288 device.SetSyncSpeed(constants.SYNC_SPEED)
1289 if on_primary or disk.OpenOnSecondary():
1291 device.Open(force=True)
1292 except errors.BlockDeviceError, err:
1293 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1294 DevCacheManager.UpdateCache(device.dev_path, owner,
1295 on_primary, disk.iv_name)
1297 device.SetInfo(info)
1299 return device.unique_id
1302 def BlockdevRemove(disk):
1303 """Remove a block device.
1305 @note: This is intended to be called recursively.
1307 @type disk: L{objects.Disk}
1308 @param disk: the disk object we should remove
1310 @return: the success of the operation
1315 rdev = _RecursiveFindBD(disk)
1316 except errors.BlockDeviceError, err:
1317 # probably can't attach
1318 logging.info("Can't attach to device %s in remove", disk)
1320 if rdev is not None:
1321 r_path = rdev.dev_path
1324 except errors.BlockDeviceError, err:
1325 msgs.append(str(err))
1327 DevCacheManager.RemoveCache(r_path)
1330 for child in disk.children:
1332 BlockdevRemove(child)
1333 except RPCFail, err:
1334 msgs.append(str(err))
1337 _Fail("; ".join(msgs))
1340 def _RecursiveAssembleBD(disk, owner, as_primary):
1341 """Activate a block device for an instance.
1343 This is run on the primary and secondary nodes for an instance.
1345 @note: this function is called recursively.
1347 @type disk: L{objects.Disk}
1348 @param disk: the disk we try to assemble
1350 @param owner: the name of the instance which owns the disk
1351 @type as_primary: boolean
1352 @param as_primary: if we should make the block device
1355 @return: the assembled device or None (in case no device
1357 @raise errors.BlockDeviceError: in case there is an error
1358 during the activation of the children or the device
1364 mcn = disk.ChildrenNeeded()
1366 mcn = 0 # max number of Nones allowed
1368 mcn = len(disk.children) - mcn # max number of Nones
1369 for chld_disk in disk.children:
1371 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1372 except errors.BlockDeviceError, err:
1373 if children.count(None) >= mcn:
1376 logging.error("Error in child activation (but continuing): %s",
1378 children.append(cdev)
1380 if as_primary or disk.AssembleOnSecondary():
1381 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1382 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1384 if as_primary or disk.OpenOnSecondary():
1386 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1387 as_primary, disk.iv_name)
1394 def BlockdevAssemble(disk, owner, as_primary):
1395 """Activate a block device for an instance.
1397 This is a wrapper over _RecursiveAssembleBD.
1399 @rtype: str or boolean
1400 @return: a C{/dev/...} path for primary nodes, and
1401 C{True} for secondary nodes
1405 result = _RecursiveAssembleBD(disk, owner, as_primary)
1406 if isinstance(result, bdev.BlockDev):
1407 # pylint: disable-msg=E1103
1408 result = result.dev_path
1409 except errors.BlockDeviceError, err:
1410 _Fail("Error while assembling disk: %s", err, exc=True)
1415 def BlockdevShutdown(disk):
1416 """Shut down a block device.
1418 First, if the device is assembled (Attach() is successful), then
1419 the device is shutdown. Then the children of the device are
1422 This function is called recursively. Note that we don't cache the
1423 children or such, as oppossed to assemble, shutdown of different
1424 devices doesn't require that the upper device was active.
1426 @type disk: L{objects.Disk}
1427 @param disk: the description of the disk we should
1433 r_dev = _RecursiveFindBD(disk)
1434 if r_dev is not None:
1435 r_path = r_dev.dev_path
1438 DevCacheManager.RemoveCache(r_path)
1439 except errors.BlockDeviceError, err:
1440 msgs.append(str(err))
1443 for child in disk.children:
1445 BlockdevShutdown(child)
1446 except RPCFail, err:
1447 msgs.append(str(err))
1450 _Fail("; ".join(msgs))
1453 def BlockdevAddchildren(parent_cdev, new_cdevs):
1454 """Extend a mirrored block device.
1456 @type parent_cdev: L{objects.Disk}
1457 @param parent_cdev: the disk to which we should add children
1458 @type new_cdevs: list of L{objects.Disk}
1459 @param new_cdevs: the list of children which we should add
1463 parent_bdev = _RecursiveFindBD(parent_cdev)
1464 if parent_bdev is None:
1465 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1466 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1467 if new_bdevs.count(None) > 0:
1468 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1469 parent_bdev.AddChildren(new_bdevs)
1472 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1473 """Shrink a mirrored block device.
1475 @type parent_cdev: L{objects.Disk}
1476 @param parent_cdev: the disk from which we should remove children
1477 @type new_cdevs: list of L{objects.Disk}
1478 @param new_cdevs: the list of children which we should remove
1482 parent_bdev = _RecursiveFindBD(parent_cdev)
1483 if parent_bdev is None:
1484 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1486 for disk in new_cdevs:
1487 rpath = disk.StaticDevPath()
1489 bd = _RecursiveFindBD(disk)
1491 _Fail("Can't find device %s while removing children", disk)
1493 devs.append(bd.dev_path)
1495 if not utils.IsNormAbsPath(rpath):
1496 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1498 parent_bdev.RemoveChildren(devs)
1501 def BlockdevGetmirrorstatus(disks):
1502 """Get the mirroring status of a list of devices.
1504 @type disks: list of L{objects.Disk}
1505 @param disks: the list of disks which we should query
1508 a list of (mirror_done, estimated_time) tuples, which
1509 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1510 @raise errors.BlockDeviceError: if any of the disks cannot be
1516 rbd = _RecursiveFindBD(dsk)
1518 _Fail("Can't find device %s", dsk)
1520 stats.append(rbd.CombinedSyncStatus())
1525 def _RecursiveFindBD(disk):
1526 """Check if a device is activated.
1528 If so, return information about the real device.
1530 @type disk: L{objects.Disk}
1531 @param disk: the disk object we need to find
1533 @return: None if the device can't be found,
1534 otherwise the device instance
1539 for chdisk in disk.children:
1540 children.append(_RecursiveFindBD(chdisk))
1542 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1545 def _OpenRealBD(disk):
1546 """Opens the underlying block device of a disk.
1548 @type disk: L{objects.Disk}
1549 @param disk: the disk object we want to open
1552 real_disk = _RecursiveFindBD(disk)
1553 if real_disk is None:
1554 _Fail("Block device '%s' is not set up", disk)
1561 def BlockdevFind(disk):
1562 """Check if a device is activated.
1564 If it is, return information about the real device.
1566 @type disk: L{objects.Disk}
1567 @param disk: the disk to find
1568 @rtype: None or objects.BlockDevStatus
1569 @return: None if the disk cannot be found, otherwise a the current
1574 rbd = _RecursiveFindBD(disk)
1575 except errors.BlockDeviceError, err:
1576 _Fail("Failed to find device: %s", err, exc=True)
1581 return rbd.GetSyncStatus()
1584 def BlockdevGetsize(disks):
1585 """Computes the size of the given disks.
1587 If a disk is not found, returns None instead.
1589 @type disks: list of L{objects.Disk}
1590 @param disks: the list of disk to compute the size for
1592 @return: list with elements None if the disk cannot be found,
1599 rbd = _RecursiveFindBD(cf)
1600 except errors.BlockDeviceError:
1606 result.append(rbd.GetActualSize())
1610 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1611 """Export a block device to a remote node.
1613 @type disk: L{objects.Disk}
1614 @param disk: the description of the disk to export
1615 @type dest_node: str
1616 @param dest_node: the destination node to export to
1617 @type dest_path: str
1618 @param dest_path: the destination path on the target node
1619 @type cluster_name: str
1620 @param cluster_name: the cluster name, needed for SSH hostalias
1624 real_disk = _OpenRealBD(disk)
1626 # the block size on the read dd is 1MiB to match our units
1627 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1628 "dd if=%s bs=1048576 count=%s",
1629 real_disk.dev_path, str(disk.size))
1631 # we set here a smaller block size as, due to ssh buffering, more
1632 # than 64-128k will mostly ignored; we use nocreat to fail if the
1633 # device is not already there or we pass a wrong path; we use
1634 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1635 # to not buffer too much memory; this means that at best, we flush
1636 # every 64k, which will not be very fast
1637 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1638 " oflag=dsync", dest_path)
1640 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1641 constants.GANETI_RUNAS,
1644 # all commands have been checked, so we're safe to combine them
1645 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1647 result = utils.RunCmd(["bash", "-c", command])
1650 _Fail("Disk copy command '%s' returned error: %s"
1651 " output: %s", command, result.fail_reason, result.output)
1654 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1655 """Write a file to the filesystem.
1657 This allows the master to overwrite(!) a file. It will only perform
1658 the operation if the file belongs to a list of configuration files.
1660 @type file_name: str
1661 @param file_name: the target file name
1663 @param data: the new contents of the file
1665 @param mode: the mode to give the file (can be None)
1667 @param uid: the owner of the file (can be -1 for default)
1669 @param gid: the group of the file (can be -1 for default)
1671 @param atime: the atime to set on the file (can be None)
1673 @param mtime: the mtime to set on the file (can be None)
1677 if not os.path.isabs(file_name):
1678 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1680 if file_name not in _ALLOWED_UPLOAD_FILES:
1681 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1684 raw_data = _Decompress(data)
1686 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1687 atime=atime, mtime=mtime)
1690 def WriteSsconfFiles(values):
1691 """Update all ssconf files.
1693 Wrapper around the SimpleStore.WriteFiles.
1696 ssconf.SimpleStore().WriteFiles(values)
1699 def _ErrnoOrStr(err):
1700 """Format an EnvironmentError exception.
1702 If the L{err} argument has an errno attribute, it will be looked up
1703 and converted into a textual C{E...} description. Otherwise the
1704 string representation of the error will be returned.
1706 @type err: L{EnvironmentError}
1707 @param err: the exception to format
1710 if hasattr(err, 'errno'):
1711 detail = errno.errorcode[err.errno]
1717 def _OSOndiskAPIVersion(os_dir):
1718 """Compute and return the API version of a given OS.
1720 This function will try to read the API version of the OS residing in
1721 the 'os_dir' directory.
1724 @param os_dir: the directory in which we should look for the OS
1726 @return: tuple (status, data) with status denoting the validity and
1727 data holding either the vaid versions or an error message
1730 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1733 st = os.stat(api_file)
1734 except EnvironmentError, err:
1735 return False, ("Required file '%s' not found under path %s: %s" %
1736 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1738 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1739 return False, ("File '%s' in %s is not a regular file" %
1740 (constants.OS_API_FILE, os_dir))
1743 api_versions = utils.ReadFile(api_file).splitlines()
1744 except EnvironmentError, err:
1745 return False, ("Error while reading the API version file at %s: %s" %
1746 (api_file, _ErrnoOrStr(err)))
1749 api_versions = [int(version.strip()) for version in api_versions]
1750 except (TypeError, ValueError), err:
1751 return False, ("API version(s) can't be converted to integer: %s" %
1754 return True, api_versions
1757 def DiagnoseOS(top_dirs=None):
1758 """Compute the validity for all OSes.
1760 @type top_dirs: list
1761 @param top_dirs: the list of directories in which to
1762 search (if not given defaults to
1763 L{constants.OS_SEARCH_PATH})
1764 @rtype: list of L{objects.OS}
1765 @return: a list of tuples (name, path, status, diagnose, variants)
1766 for all (potential) OSes under all search paths, where:
1767 - name is the (potential) OS name
1768 - path is the full path to the OS
1769 - status True/False is the validity of the OS
1770 - diagnose is the error message for an invalid OS, otherwise empty
1771 - variants is a list of supported OS variants, if any
1774 if top_dirs is None:
1775 top_dirs = constants.OS_SEARCH_PATH
1778 for dir_name in top_dirs:
1779 if os.path.isdir(dir_name):
1781 f_names = utils.ListVisibleFiles(dir_name)
1782 except EnvironmentError, err:
1783 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1785 for name in f_names:
1786 os_path = utils.PathJoin(dir_name, name)
1787 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1790 variants = os_inst.supported_variants
1794 result.append((name, os_path, status, diagnose, variants))
1799 def _TryOSFromDisk(name, base_dir=None):
1800 """Create an OS instance from disk.
1802 This function will return an OS instance if the given name is a
1805 @type base_dir: string
1806 @keyword base_dir: Base directory containing OS installations.
1807 Defaults to a search in all the OS_SEARCH_PATH dirs.
1809 @return: success and either the OS instance if we find a valid one,
1813 if base_dir is None:
1814 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1816 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1819 return False, "Directory for OS %s not found in search path" % name
1821 status, api_versions = _OSOndiskAPIVersion(os_dir)
1824 return status, api_versions
1826 if not constants.OS_API_VERSIONS.intersection(api_versions):
1827 return False, ("API version mismatch for path '%s': found %s, want %s." %
1828 (os_dir, api_versions, constants.OS_API_VERSIONS))
1830 # OS Files dictionary, we will populate it with the absolute path names
1831 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1833 if max(api_versions) >= constants.OS_API_V15:
1834 os_files[constants.OS_VARIANTS_FILE] = ''
1836 for filename in os_files:
1837 os_files[filename] = utils.PathJoin(os_dir, filename)
1840 st = os.stat(os_files[filename])
1841 except EnvironmentError, err:
1842 return False, ("File '%s' under path '%s' is missing (%s)" %
1843 (filename, os_dir, _ErrnoOrStr(err)))
1845 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1846 return False, ("File '%s' under path '%s' is not a regular file" %
1849 if filename in constants.OS_SCRIPTS:
1850 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1851 return False, ("File '%s' under path '%s' is not executable" %
1855 if constants.OS_VARIANTS_FILE in os_files:
1856 variants_file = os_files[constants.OS_VARIANTS_FILE]
1858 variants = utils.ReadFile(variants_file).splitlines()
1859 except EnvironmentError, err:
1860 return False, ("Error while reading the OS variants file at %s: %s" %
1861 (variants_file, _ErrnoOrStr(err)))
1863 return False, ("No supported os variant found")
1865 os_obj = objects.OS(name=name, path=os_dir,
1866 create_script=os_files[constants.OS_SCRIPT_CREATE],
1867 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1868 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1869 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1870 supported_variants=variants,
1871 api_versions=api_versions)
1875 def OSFromDisk(name, base_dir=None):
1876 """Create an OS instance from disk.
1878 This function will return an OS instance if the given name is a
1879 valid OS name. Otherwise, it will raise an appropriate
1880 L{RPCFail} exception, detailing why this is not a valid OS.
1882 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1883 an exception but returns true/false status data.
1885 @type base_dir: string
1886 @keyword base_dir: Base directory containing OS installations.
1887 Defaults to a search in all the OS_SEARCH_PATH dirs.
1888 @rtype: L{objects.OS}
1889 @return: the OS instance if we find a valid one
1890 @raise RPCFail: if we don't find a valid OS
1893 name_only = name.split("+", 1)[0]
1894 status, payload = _TryOSFromDisk(name_only, base_dir)
1902 def OSEnvironment(instance, inst_os, debug=0):
1903 """Calculate the environment for an os script.
1905 @type instance: L{objects.Instance}
1906 @param instance: target instance for the os script run
1907 @type inst_os: L{objects.OS}
1908 @param inst_os: operating system for which the environment is being built
1909 @type debug: integer
1910 @param debug: debug level (0 or 1, for OS Api 10)
1912 @return: dict of environment variables
1913 @raise errors.BlockDeviceError: if the block device
1919 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1920 result['OS_API_VERSION'] = '%d' % api_version
1921 result['INSTANCE_NAME'] = instance.name
1922 result['INSTANCE_OS'] = instance.os
1923 result['HYPERVISOR'] = instance.hypervisor
1924 result['DISK_COUNT'] = '%d' % len(instance.disks)
1925 result['NIC_COUNT'] = '%d' % len(instance.nics)
1926 result['DEBUG_LEVEL'] = '%d' % debug
1927 if api_version >= constants.OS_API_V15:
1929 variant = instance.os.split('+', 1)[1]
1931 variant = inst_os.supported_variants[0]
1932 result['OS_VARIANT'] = variant
1933 for idx, disk in enumerate(instance.disks):
1934 real_disk = _OpenRealBD(disk)
1935 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1936 result['DISK_%d_ACCESS' % idx] = disk.mode
1937 if constants.HV_DISK_TYPE in instance.hvparams:
1938 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1939 instance.hvparams[constants.HV_DISK_TYPE]
1940 if disk.dev_type in constants.LDS_BLOCK:
1941 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1942 elif disk.dev_type == constants.LD_FILE:
1943 result['DISK_%d_BACKEND_TYPE' % idx] = \
1944 'file:%s' % disk.physical_id[0]
1945 for idx, nic in enumerate(instance.nics):
1946 result['NIC_%d_MAC' % idx] = nic.mac
1948 result['NIC_%d_IP' % idx] = nic.ip
1949 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1950 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1951 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1952 if nic.nicparams[constants.NIC_LINK]:
1953 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1954 if constants.HV_NIC_TYPE in instance.hvparams:
1955 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1956 instance.hvparams[constants.HV_NIC_TYPE]
1958 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1959 for key, value in source.items():
1960 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1965 def BlockdevGrow(disk, amount):
1966 """Grow a stack of block devices.
1968 This function is called recursively, with the childrens being the
1969 first ones to resize.
1971 @type disk: L{objects.Disk}
1972 @param disk: the disk to be grown
1973 @rtype: (status, result)
1974 @return: a tuple with the status of the operation
1975 (True/False), and the errors message if status
1979 r_dev = _RecursiveFindBD(disk)
1981 _Fail("Cannot find block device %s", disk)
1985 except errors.BlockDeviceError, err:
1986 _Fail("Failed to grow block device: %s", err, exc=True)
1989 def BlockdevSnapshot(disk):
1990 """Create a snapshot copy of a block device.
1992 This function is called recursively, and the snapshot is actually created
1993 just for the leaf lvm backend device.
1995 @type disk: L{objects.Disk}
1996 @param disk: the disk to be snapshotted
1998 @return: snapshot disk path
2001 if disk.dev_type == constants.LD_DRBD8:
2002 if not disk.children:
2003 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2005 return BlockdevSnapshot(disk.children[0])
2006 elif disk.dev_type == constants.LD_LV:
2007 r_dev = _RecursiveFindBD(disk)
2008 if r_dev is not None:
2009 # FIXME: choose a saner value for the snapshot size
2010 # let's stay on the safe side and ask for the full size, for now
2011 return r_dev.Snapshot(disk.size)
2013 _Fail("Cannot find block device %s", disk)
2015 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2016 disk.unique_id, disk.dev_type)
2019 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
2020 """Export a block device snapshot to a remote node.
2022 @type disk: L{objects.Disk}
2023 @param disk: the description of the disk to export
2024 @type dest_node: str
2025 @param dest_node: the destination node to export to
2026 @type instance: L{objects.Instance}
2027 @param instance: the instance object to whom the disk belongs
2028 @type cluster_name: str
2029 @param cluster_name: the cluster name, needed for SSH hostalias
2031 @param idx: the index of the disk in the instance's disk list,
2032 used to export to the OS scripts environment
2033 @type debug: integer
2034 @param debug: debug level, passed to the OS scripts
2038 inst_os = OSFromDisk(instance.os)
2039 export_env = OSEnvironment(instance, inst_os, debug)
2041 export_script = inst_os.export_script
2043 logfile = _InstanceLogName("export", inst_os.name, instance.name)
2045 real_disk = _OpenRealBD(disk)
2047 export_env['EXPORT_DEVICE'] = real_disk.dev_path
2048 export_env['EXPORT_INDEX'] = str(idx)
2050 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2051 destfile = disk.physical_id[1]
2053 # the target command is built out of three individual commands,
2054 # which are joined by pipes; we check each individual command for
2056 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2057 inst_os.path, export_script, logfile)
2061 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2062 destdir, utils.PathJoin(destdir, destfile))
2063 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2064 constants.GANETI_RUNAS,
2067 # all commands have been checked, so we're safe to combine them
2068 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2070 result = utils.RunCmd(["bash", "-c", command], env=export_env)
2073 _Fail("OS snapshot export command '%s' returned error: %s"
2074 " output: %s", command, result.fail_reason, result.output)
2077 def FinalizeExport(instance, snap_disks):
2078 """Write out the export configuration information.
2080 @type instance: L{objects.Instance}
2081 @param instance: the instance which we export, used for
2082 saving configuration
2083 @type snap_disks: list of L{objects.Disk}
2084 @param snap_disks: list of snapshot block devices, which
2085 will be used to get the actual name of the dump file
2090 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2091 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2093 config = objects.SerializableConfigParser()
2095 config.add_section(constants.INISECT_EXP)
2096 config.set(constants.INISECT_EXP, 'version', '0')
2097 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2098 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2099 config.set(constants.INISECT_EXP, 'os', instance.os)
2100 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2102 config.add_section(constants.INISECT_INS)
2103 config.set(constants.INISECT_INS, 'name', instance.name)
2104 config.set(constants.INISECT_INS, 'memory', '%d' %
2105 instance.beparams[constants.BE_MEMORY])
2106 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2107 instance.beparams[constants.BE_VCPUS])
2108 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2109 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2112 for nic_count, nic in enumerate(instance.nics):
2114 config.set(constants.INISECT_INS, 'nic%d_mac' %
2115 nic_count, '%s' % nic.mac)
2116 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2117 for param in constants.NICS_PARAMETER_TYPES:
2118 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2119 '%s' % nic.nicparams.get(param, None))
2120 # TODO: redundant: on load can read nics until it doesn't exist
2121 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2124 for disk_count, disk in enumerate(snap_disks):
2127 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2128 ('%s' % disk.iv_name))
2129 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2130 ('%s' % disk.physical_id[1]))
2131 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2134 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2136 # New-style hypervisor/backend parameters
2138 config.add_section(constants.INISECT_HYP)
2139 for name, value in instance.hvparams.items():
2140 if name not in constants.HVC_GLOBALS:
2141 config.set(constants.INISECT_HYP, name, str(value))
2143 config.add_section(constants.INISECT_BEP)
2144 for name, value in instance.beparams.items():
2145 config.set(constants.INISECT_BEP, name, str(value))
2147 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2148 data=config.Dumps())
2149 shutil.rmtree(finaldestdir, ignore_errors=True)
2150 shutil.move(destdir, finaldestdir)
2153 def ExportInfo(dest):
2154 """Get export configuration information.
2157 @param dest: directory containing the export
2159 @rtype: L{objects.SerializableConfigParser}
2160 @return: a serializable config file containing the
2164 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2166 config = objects.SerializableConfigParser()
2169 if (not config.has_section(constants.INISECT_EXP) or
2170 not config.has_section(constants.INISECT_INS)):
2171 _Fail("Export info file doesn't have the required fields")
2173 return config.Dumps()
2176 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2177 """Import an os image into an instance.
2179 @type instance: L{objects.Instance}
2180 @param instance: instance to import the disks into
2181 @type src_node: string
2182 @param src_node: source node for the disk images
2183 @type src_images: list of string
2184 @param src_images: absolute paths of the disk images
2185 @type debug: integer
2186 @param debug: debug level, passed to the OS scripts
2187 @rtype: list of boolean
2188 @return: each boolean represent the success of importing the n-th disk
2191 inst_os = OSFromDisk(instance.os)
2192 import_env = OSEnvironment(instance, inst_os, debug)
2193 import_script = inst_os.import_script
2195 logfile = _InstanceLogName("import", instance.os, instance.name)
2198 impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2199 import_script, logfile)
2202 for idx, image in enumerate(src_images):
2204 destcmd = utils.BuildShellCmd('cat %s', image)
2205 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2206 constants.GANETI_RUNAS,
2208 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2209 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2210 import_env['IMPORT_INDEX'] = str(idx)
2211 result = utils.RunCmd(command, env=import_env)
2213 logging.error("Disk import command '%s' returned error: %s"
2214 " output: %s", command, result.fail_reason,
2216 final_result.append("error importing disk %d: %s, %s" %
2217 (idx, result.fail_reason, result.output[-100]))
2220 _Fail("; ".join(final_result), log=False)
2224 """Return a list of exports currently available on this machine.
2227 @return: list of the exports
2230 if os.path.isdir(constants.EXPORT_DIR):
2231 return utils.ListVisibleFiles(constants.EXPORT_DIR)
2233 _Fail("No exports directory")
2236 def RemoveExport(export):
2237 """Remove an existing export from the node.
2240 @param export: the name of the export to remove
2244 target = utils.PathJoin(constants.EXPORT_DIR, export)
2247 shutil.rmtree(target)
2248 except EnvironmentError, err:
2249 _Fail("Error while removing the export: %s", err, exc=True)
2252 def BlockdevRename(devlist):
2253 """Rename a list of block devices.
2255 @type devlist: list of tuples
2256 @param devlist: list of tuples of the form (disk,
2257 new_logical_id, new_physical_id); disk is an
2258 L{objects.Disk} object describing the current disk,
2259 and new logical_id/physical_id is the name we
2262 @return: True if all renames succeeded, False otherwise
2267 for disk, unique_id in devlist:
2268 dev = _RecursiveFindBD(disk)
2270 msgs.append("Can't find device %s in rename" % str(disk))
2274 old_rpath = dev.dev_path
2275 dev.Rename(unique_id)
2276 new_rpath = dev.dev_path
2277 if old_rpath != new_rpath:
2278 DevCacheManager.RemoveCache(old_rpath)
2279 # FIXME: we should add the new cache information here, like:
2280 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2281 # but we don't have the owner here - maybe parse from existing
2282 # cache? for now, we only lose lvm data when we rename, which
2283 # is less critical than DRBD or MD
2284 except errors.BlockDeviceError, err:
2285 msgs.append("Can't rename device '%s' to '%s': %s" %
2286 (dev, unique_id, err))
2287 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2290 _Fail("; ".join(msgs))
2293 def _TransformFileStorageDir(file_storage_dir):
2294 """Checks whether given file_storage_dir is valid.
2296 Checks wheter the given file_storage_dir is within the cluster-wide
2297 default file_storage_dir stored in SimpleStore. Only paths under that
2298 directory are allowed.
2300 @type file_storage_dir: str
2301 @param file_storage_dir: the path to check
2303 @return: the normalized path if valid, None otherwise
2306 if not constants.ENABLE_FILE_STORAGE:
2307 _Fail("File storage disabled at configure time")
2309 file_storage_dir = os.path.normpath(file_storage_dir)
2310 base_file_storage_dir = cfg.GetFileStorageDir()
2311 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2312 base_file_storage_dir):
2313 _Fail("File storage directory '%s' is not under base file"
2314 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2315 return file_storage_dir
2318 def CreateFileStorageDir(file_storage_dir):
2319 """Create file storage directory.
2321 @type file_storage_dir: str
2322 @param file_storage_dir: directory to create
2325 @return: tuple with first element a boolean indicating wheter dir
2326 creation was successful or not
2329 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2330 if os.path.exists(file_storage_dir):
2331 if not os.path.isdir(file_storage_dir):
2332 _Fail("Specified storage dir '%s' is not a directory",
2336 os.makedirs(file_storage_dir, 0750)
2337 except OSError, err:
2338 _Fail("Cannot create file storage directory '%s': %s",
2339 file_storage_dir, err, exc=True)
2342 def RemoveFileStorageDir(file_storage_dir):
2343 """Remove file storage directory.
2345 Remove it only if it's empty. If not log an error and return.
2347 @type file_storage_dir: str
2348 @param file_storage_dir: the directory we should cleanup
2349 @rtype: tuple (success,)
2350 @return: tuple of one element, C{success}, denoting
2351 whether the operation was successful
2354 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2355 if os.path.exists(file_storage_dir):
2356 if not os.path.isdir(file_storage_dir):
2357 _Fail("Specified Storage directory '%s' is not a directory",
2359 # deletes dir only if empty, otherwise we want to fail the rpc call
2361 os.rmdir(file_storage_dir)
2362 except OSError, err:
2363 _Fail("Cannot remove file storage directory '%s': %s",
2364 file_storage_dir, err)
2367 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2368 """Rename the file storage directory.
2370 @type old_file_storage_dir: str
2371 @param old_file_storage_dir: the current path
2372 @type new_file_storage_dir: str
2373 @param new_file_storage_dir: the name we should rename to
2374 @rtype: tuple (success,)
2375 @return: tuple of one element, C{success}, denoting
2376 whether the operation was successful
2379 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2380 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2381 if not os.path.exists(new_file_storage_dir):
2382 if os.path.isdir(old_file_storage_dir):
2384 os.rename(old_file_storage_dir, new_file_storage_dir)
2385 except OSError, err:
2386 _Fail("Cannot rename '%s' to '%s': %s",
2387 old_file_storage_dir, new_file_storage_dir, err)
2389 _Fail("Specified storage dir '%s' is not a directory",
2390 old_file_storage_dir)
2392 if os.path.exists(old_file_storage_dir):
2393 _Fail("Cannot rename '%s' to '%s': both locations exist",
2394 old_file_storage_dir, new_file_storage_dir)
2397 def _EnsureJobQueueFile(file_name):
2398 """Checks whether the given filename is in the queue directory.
2400 @type file_name: str
2401 @param file_name: the file name we should check
2403 @raises RPCFail: if the file is not valid
2406 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2407 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2410 _Fail("Passed job queue file '%s' does not belong to"
2411 " the queue directory '%s'", file_name, queue_dir)
2414 def JobQueueUpdate(file_name, content):
2415 """Updates a file in the queue directory.
2417 This is just a wrapper over L{utils.WriteFile}, with proper
2420 @type file_name: str
2421 @param file_name: the job file name
2423 @param content: the new job contents
2425 @return: the success of the operation
2428 _EnsureJobQueueFile(file_name)
2430 # Write and replace the file atomically
2431 utils.WriteFile(file_name, data=_Decompress(content))
2434 def JobQueueRename(old, new):
2435 """Renames a job queue file.
2437 This is just a wrapper over os.rename with proper checking.
2440 @param old: the old (actual) file name
2442 @param new: the desired file name
2444 @return: the success of the operation and payload
2447 _EnsureJobQueueFile(old)
2448 _EnsureJobQueueFile(new)
2450 utils.RenameFile(old, new, mkdir=True)
2453 def JobQueueSetDrainFlag(drain_flag):
2454 """Set the drain flag for the queue.
2456 This will set or unset the queue drain flag.
2458 @type drain_flag: boolean
2459 @param drain_flag: if True, will set the drain flag, otherwise reset it.
2461 @return: always True, None
2462 @warning: the function always returns True
2466 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2468 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2471 def BlockdevClose(instance_name, disks):
2472 """Closes the given block devices.
2474 This means they will be switched to secondary mode (in case of
2477 @param instance_name: if the argument is not empty, the symlinks
2478 of this instance will be removed
2479 @type disks: list of L{objects.Disk}
2480 @param disks: the list of disks to be closed
2481 @rtype: tuple (success, message)
2482 @return: a tuple of success and message, where success
2483 indicates the succes of the operation, and message
2484 which will contain the error details in case we
2490 rd = _RecursiveFindBD(cf)
2492 _Fail("Can't find device %s", cf)
2499 except errors.BlockDeviceError, err:
2500 msg.append(str(err))
2502 _Fail("Can't make devices secondary: %s", ",".join(msg))
2505 _RemoveBlockDevLinks(instance_name, disks)
2508 def ValidateHVParams(hvname, hvparams):
2509 """Validates the given hypervisor parameters.
2511 @type hvname: string
2512 @param hvname: the hypervisor name
2513 @type hvparams: dict
2514 @param hvparams: the hypervisor parameters to be validated
2519 hv_type = hypervisor.GetHypervisor(hvname)
2520 hv_type.ValidateParameters(hvparams)
2521 except errors.HypervisorError, err:
2522 _Fail(str(err), log=False)
2526 """Demotes the current node from master candidate role.
2529 # try to ensure we're not the master by mistake
2530 master, myself = ssconf.GetMasterAndMyself()
2531 if master == myself:
2532 _Fail("ssconf status shows I'm the master node, will not demote")
2534 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2535 if not result.failed:
2536 _Fail("The master daemon is running, will not demote")
2539 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2540 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2541 except EnvironmentError, err:
2542 if err.errno != errno.ENOENT:
2543 _Fail("Error while backing up cluster file: %s", err, exc=True)
2545 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2548 def _FindDisks(nodes_ip, disks):
2549 """Sets the physical ID on disks and returns the block devices.
2552 # set the correct physical ID
2553 my_name = utils.HostInfo().name
2555 cf.SetPhysicalID(my_name, nodes_ip)
2560 rd = _RecursiveFindBD(cf)
2562 _Fail("Can't find device %s", cf)
2567 def DrbdDisconnectNet(nodes_ip, disks):
2568 """Disconnects the network on a list of drbd devices.
2571 bdevs = _FindDisks(nodes_ip, disks)
2577 except errors.BlockDeviceError, err:
2578 _Fail("Can't change network configuration to standalone mode: %s",
2582 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2583 """Attaches the network on a list of drbd devices.
2586 bdevs = _FindDisks(nodes_ip, disks)
2589 for idx, rd in enumerate(bdevs):
2591 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2592 except EnvironmentError, err:
2593 _Fail("Can't create symlink: %s", err)
2594 # reconnect disks, switch to new master configuration and if
2595 # needed primary mode
2598 rd.AttachNet(multimaster)
2599 except errors.BlockDeviceError, err:
2600 _Fail("Can't change network configuration: %s", err)
2602 # wait until the disks are connected; we need to retry the re-attach
2603 # if the device becomes standalone, as this might happen if the one
2604 # node disconnects and reconnects in a different mode before the
2605 # other node reconnects; in this case, one or both of the nodes will
2606 # decide it has wrong configuration and switch to standalone
2609 all_connected = True
2612 stats = rd.GetProcStatus()
2614 all_connected = (all_connected and
2615 (stats.is_connected or stats.is_in_resync))
2617 if stats.is_standalone:
2618 # peer had different config info and this node became
2619 # standalone, even though this should not happen with the
2620 # new staged way of changing disk configs
2622 rd.AttachNet(multimaster)
2623 except errors.BlockDeviceError, err:
2624 _Fail("Can't change network configuration: %s", err)
2626 if not all_connected:
2627 raise utils.RetryAgain()
2630 # Start with a delay of 100 miliseconds and go up to 5 seconds
2631 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2632 except utils.RetryTimeout:
2633 _Fail("Timeout in disk reconnecting")
2636 # change to primary mode
2640 except errors.BlockDeviceError, err:
2641 _Fail("Can't change to primary mode: %s", err)
2644 def DrbdWaitSync(nodes_ip, disks):
2645 """Wait until DRBDs have synchronized.
2649 stats = rd.GetProcStatus()
2650 if not (stats.is_connected or stats.is_in_resync):
2651 raise utils.RetryAgain()
2654 bdevs = _FindDisks(nodes_ip, disks)
2660 # poll each second for 15 seconds
2661 stats = utils.Retry(_helper, 1, 15, args=[rd])
2662 except utils.RetryTimeout:
2663 stats = rd.GetProcStatus()
2665 if not (stats.is_connected or stats.is_in_resync):
2666 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2667 alldone = alldone and (not stats.is_in_resync)
2668 if stats.sync_percent is not None:
2669 min_resync = min(min_resync, stats.sync_percent)
2671 return (alldone, min_resync)
2674 def PowercycleNode(hypervisor_type):
2675 """Hard-powercycle the node.
2677 Because we need to return first, and schedule the powercycle in the
2678 background, we won't be able to report failures nicely.
2681 hyper = hypervisor.GetHypervisor(hypervisor_type)
2685 # if we can't fork, we'll pretend that we're in the child process
2688 return "Reboot scheduled in 5 seconds"
2689 # ensure the child is running on ram
2692 except Exception: # pylint: disable-msg=W0703
2695 hyper.PowercycleNode()
2698 class HooksRunner(object):
2701 This class is instantiated on the node side (ganeti-noded) and not
2705 def __init__(self, hooks_base_dir=None):
2706 """Constructor for hooks runner.
2708 @type hooks_base_dir: str or None
2709 @param hooks_base_dir: if not None, this overrides the
2710 L{constants.HOOKS_BASE_DIR} (useful for unittests)
2713 if hooks_base_dir is None:
2714 hooks_base_dir = constants.HOOKS_BASE_DIR
2715 # yeah, _BASE_DIR is not valid for attributes, we use it like a
2717 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2719 def RunHooks(self, hpath, phase, env):
2720 """Run the scripts in the hooks directory.
2723 @param hpath: the path to the hooks directory which
2726 @param phase: either L{constants.HOOKS_PHASE_PRE} or
2727 L{constants.HOOKS_PHASE_POST}
2729 @param env: dictionary with the environment for the hook
2731 @return: list of 3-element tuples:
2733 - script result, either L{constants.HKR_SUCCESS} or
2734 L{constants.HKR_FAIL}
2735 - output of the script
2737 @raise errors.ProgrammerError: for invalid input
2741 if phase == constants.HOOKS_PHASE_PRE:
2743 elif phase == constants.HOOKS_PHASE_POST:
2746 _Fail("Unknown hooks phase '%s'", phase)
2749 subdir = "%s-%s.d" % (hpath, suffix)
2750 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2754 if not os.path.isdir(dir_name):
2755 # for non-existing/non-dirs, we simply exit instead of logging a
2756 # warning at every operation
2759 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2761 for (relname, relstatus, runresult) in runparts_results:
2762 if relstatus == constants.RUNPARTS_SKIP:
2763 rrval = constants.HKR_SKIP
2765 elif relstatus == constants.RUNPARTS_ERR:
2766 rrval = constants.HKR_FAIL
2767 output = "Hook script execution error: %s" % runresult
2768 elif relstatus == constants.RUNPARTS_RUN:
2769 if runresult.failed:
2770 rrval = constants.HKR_FAIL
2772 rrval = constants.HKR_SUCCESS
2773 output = utils.SafeEncode(runresult.output.strip())
2774 results.append(("%s/%s" % (subdir, relname), rrval, output))
2779 class IAllocatorRunner(object):
2780 """IAllocator runner.
2782 This class is instantiated on the node side (ganeti-noded) and not on
2787 def Run(name, idata):
2788 """Run an iallocator script.
2791 @param name: the iallocator script name
2793 @param idata: the allocator input data
2796 @return: two element tuple of:
2798 - either error message or stdout of allocator (for success)
2801 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2803 if alloc_script is None:
2804 _Fail("iallocator module '%s' not found in the search path", name)
2806 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2810 result = utils.RunCmd([alloc_script, fin_name])
2812 _Fail("iallocator module '%s' failed: %s, output '%s'",
2813 name, result.fail_reason, result.output)
2817 return result.stdout
2820 class DevCacheManager(object):
2821 """Simple class for managing a cache of block device information.
2824 _DEV_PREFIX = "/dev/"
2825 _ROOT_DIR = constants.BDEV_CACHE_DIR
2828 def _ConvertPath(cls, dev_path):
2829 """Converts a /dev/name path to the cache file name.
2831 This replaces slashes with underscores and strips the /dev
2832 prefix. It then returns the full path to the cache file.
2835 @param dev_path: the C{/dev/} path name
2837 @return: the converted path name
2840 if dev_path.startswith(cls._DEV_PREFIX):
2841 dev_path = dev_path[len(cls._DEV_PREFIX):]
2842 dev_path = dev_path.replace("/", "_")
2843 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
2847 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2848 """Updates the cache information for a given device.
2851 @param dev_path: the pathname of the device
2853 @param owner: the owner (instance name) of the device
2854 @type on_primary: bool
2855 @param on_primary: whether this is the primary
2858 @param iv_name: the instance-visible name of the
2859 device, as in objects.Disk.iv_name
2864 if dev_path is None:
2865 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2867 fpath = cls._ConvertPath(dev_path)
2873 iv_name = "not_visible"
2874 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2876 utils.WriteFile(fpath, data=fdata)
2877 except EnvironmentError, err:
2878 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2881 def RemoveCache(cls, dev_path):
2882 """Remove data for a dev_path.
2884 This is just a wrapper over L{utils.RemoveFile} with a converted
2885 path name and logging.
2888 @param dev_path: the pathname of the device
2893 if dev_path is None:
2894 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2896 fpath = cls._ConvertPath(dev_path)
2898 utils.RemoveFile(fpath)
2899 except EnvironmentError, err:
2900 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)