4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27 in the L{_CleanDirectory} function
31 # pylint: disable-msg=E1103
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
63 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64 _ALLOWED_CLEAN_DIRS = frozenset([
66 constants.JOB_QUEUE_ARCHIVE_DIR,
68 constants.CRYPTO_KEYS_DIR,
70 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71 _X509_KEY_FILE = "key"
72 _X509_CERT_FILE = "cert"
73 _IES_STATUS_FILE = "status"
78 class RPCFail(Exception):
79 """Class denoting RPC failure.
81 Its argument is the error message.
86 def _Fail(msg, *args, **kwargs):
87 """Log an error and the raise an RPCFail exception.
89 This exception is then handled specially in the ganeti daemon and
90 turned into a 'failed' return type. As such, this function is a
91 useful shortcut for logging the error and returning it to the master
95 @param msg: the text of the exception
101 if "log" not in kwargs or kwargs["log"]: # if we should log this error
102 if "exc" in kwargs and kwargs["exc"]:
103 logging.exception(msg)
110 """Simple wrapper to return a SimpleStore.
112 @rtype: L{ssconf.SimpleStore}
113 @return: a SimpleStore instance
116 return ssconf.SimpleStore()
119 def _GetSshRunner(cluster_name):
120 """Simple wrapper to return an SshRunner.
122 @type cluster_name: str
123 @param cluster_name: the cluster name, which is needed
124 by the SshRunner constructor
125 @rtype: L{ssh.SshRunner}
126 @return: an SshRunner instance
129 return ssh.SshRunner(cluster_name)
132 def _Decompress(data):
133 """Unpacks data compressed by the RPC client.
135 @type data: list or tuple
136 @param data: Data sent by RPC client
138 @return: Decompressed data
141 assert isinstance(data, (list, tuple))
142 assert len(data) == 2
143 (encoding, content) = data
144 if encoding == constants.RPC_ENCODING_NONE:
146 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147 return zlib.decompress(base64.b64decode(content))
149 raise AssertionError("Unknown data encoding")
152 def _CleanDirectory(path, exclude=None):
153 """Removes all regular files in a directory.
156 @param path: the directory to clean
158 @param exclude: list of files to be excluded, defaults
162 if path not in _ALLOWED_CLEAN_DIRS:
163 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166 if not os.path.isdir(path):
171 # Normalize excluded paths
172 exclude = [os.path.normpath(i) for i in exclude]
174 for rel_name in utils.ListVisibleFiles(path):
175 full_name = utils.PathJoin(path, rel_name)
176 if full_name in exclude:
178 if os.path.isfile(full_name) and not os.path.islink(full_name):
179 utils.RemoveFile(full_name)
182 def _BuildUploadFileList():
183 """Build the list of allowed upload files.
185 This is abstracted so that it's built only once at module import time.
188 allowed_files = set([
189 constants.CLUSTER_CONF_FILE,
191 constants.SSH_KNOWN_HOSTS_FILE,
192 constants.VNC_PASSWORD_FILE,
193 constants.RAPI_CERT_FILE,
194 constants.RAPI_USERS_FILE,
195 constants.CONFD_HMAC_KEY,
198 for hv_name in constants.HYPER_TYPES:
199 hv_class = hypervisor.GetHypervisorClass(hv_name)
200 allowed_files.update(hv_class.GetAncillaryFiles())
202 return frozenset(allowed_files)
205 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
209 """Removes job queue files and archived jobs.
215 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
216 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
220 """Returns master information.
222 This is an utility function to compute master information, either
223 for consumption here or from the node daemon.
226 @return: master_netdev, master_ip, master_name
227 @raise RPCFail: in case of errors
232 master_netdev = cfg.GetMasterNetdev()
233 master_ip = cfg.GetMasterIP()
234 master_node = cfg.GetMasterNode()
235 except errors.ConfigurationError, err:
236 _Fail("Cluster configuration incomplete: %s", err, exc=True)
237 return (master_netdev, master_ip, master_node)
240 def StartMaster(start_daemons, no_voting):
241 """Activate local node as master node.
243 The function will always try activate the IP address of the master
244 (unless someone else has it). It will also start the master daemons,
245 based on the start_daemons parameter.
247 @type start_daemons: boolean
248 @param start_daemons: whether to also start the master
249 daemons (ganeti-masterd and ganeti-rapi)
250 @type no_voting: boolean
251 @param no_voting: whether to start ganeti-masterd without a node vote
252 (if start_daemons is True), but still non-interactively
256 # GetMasterInfo will raise an exception if not able to return data
257 master_netdev, master_ip, _ = GetMasterInfo()
260 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
261 if utils.OwnIpAddress(master_ip):
262 # we already have the ip:
263 logging.debug("Master IP already configured, doing nothing")
265 msg = "Someone else has the master ip, not activating"
269 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
270 "dev", master_netdev, "label",
271 "%s:0" % master_netdev])
273 msg = "Can't activate master IP: %s" % result.output
277 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
278 "-s", master_ip, master_ip])
279 # we'll ignore the exit code of arping
281 # and now start the master and rapi daemons
284 masterd_args = "--no-voting --yes-do-it"
289 "EXTRA_MASTERD_ARGS": masterd_args,
292 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
294 msg = "Can't start Ganeti master: %s" % result.output
299 _Fail("; ".join(err_msgs))
302 def StopMaster(stop_daemons):
303 """Deactivate this node as master.
305 The function will always try to deactivate the IP address of the
306 master. It will also stop the master daemons depending on the
307 stop_daemons parameter.
309 @type stop_daemons: boolean
310 @param stop_daemons: whether to also stop the master daemons
311 (ganeti-masterd and ganeti-rapi)
315 # TODO: log and report back to the caller the error failures; we
316 # need to decide in which case we fail the RPC for this
318 # GetMasterInfo will raise an exception if not able to return data
319 master_netdev, master_ip, _ = GetMasterInfo()
321 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
322 "dev", master_netdev])
324 logging.error("Can't remove the master IP, error: %s", result.output)
325 # but otherwise ignore the failure
328 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
330 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
332 result.cmd, result.exit_code, result.output)
335 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
336 """Joins this node to the cluster.
338 This does the following:
339 - updates the hostkeys of the machine (rsa and dsa)
340 - adds the ssh private key to the user
341 - adds the ssh public key to the users' authorized_keys file
344 @param dsa: the DSA private key to write
346 @param dsapub: the DSA public key to write
348 @param rsa: the RSA private key to write
350 @param rsapub: the RSA public key to write
352 @param sshkey: the SSH private key to write
354 @param sshpub: the SSH public key to write
356 @return: the success of the operation
359 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
360 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
361 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
362 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
363 for name, content, mode in sshd_keys:
364 utils.WriteFile(name, data=content, mode=mode)
367 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
369 except errors.OpExecError, err:
370 _Fail("Error while processing user ssh files: %s", err, exc=True)
372 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
373 utils.WriteFile(name, data=content, mode=0600)
375 utils.AddAuthorizedKey(auth_keys, sshpub)
377 result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
379 _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
380 result.cmd, result.exit_code, result.output)
383 def LeaveCluster(modify_ssh_setup):
384 """Cleans up and remove the current node.
386 This function cleans up and prepares the current node to be removed
389 If processing is successful, then it raises an
390 L{errors.QuitGanetiException} which is used as a special case to
391 shutdown the node daemon.
393 @param modify_ssh_setup: boolean
396 _CleanDirectory(constants.DATA_DIR)
397 _CleanDirectory(constants.CRYPTO_KEYS_DIR)
402 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
404 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
406 utils.RemoveFile(priv_key)
407 utils.RemoveFile(pub_key)
408 except errors.OpExecError:
409 logging.exception("Error while processing ssh files")
412 utils.RemoveFile(constants.CONFD_HMAC_KEY)
413 utils.RemoveFile(constants.RAPI_CERT_FILE)
414 utils.RemoveFile(constants.NODED_CERT_FILE)
415 except: # pylint: disable-msg=W0702
416 logging.exception("Error while removing cluster secrets")
418 result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
420 logging.error("Command %s failed with exitcode %s and error %s",
421 result.cmd, result.exit_code, result.output)
423 # Raise a custom exception (handled in ganeti-noded)
424 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
427 def GetNodeInfo(vgname, hypervisor_type):
428 """Gives back a hash with different information about the node.
430 @type vgname: C{string}
431 @param vgname: the name of the volume group to ask for disk space information
432 @type hypervisor_type: C{str}
433 @param hypervisor_type: the name of the hypervisor to ask for
436 @return: dictionary with the following keys:
437 - vg_size is the size of the configured volume group in MiB
438 - vg_free is the free size of the volume group in MiB
439 - memory_dom0 is the memory allocated for domain0 in MiB
440 - memory_free is the currently available (free) ram in MiB
441 - memory_total is the total number of ram in MiB
445 vginfo = _GetVGInfo(vgname)
446 outputarray['vg_size'] = vginfo['vg_size']
447 outputarray['vg_free'] = vginfo['vg_free']
449 hyper = hypervisor.GetHypervisor(hypervisor_type)
450 hyp_info = hyper.GetNodeInfo()
451 if hyp_info is not None:
452 outputarray.update(hyp_info)
454 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
459 def VerifyNode(what, cluster_name):
460 """Verify the status of the local node.
462 Based on the input L{what} parameter, various checks are done on the
465 If the I{filelist} key is present, this list of
466 files is checksummed and the file/checksum pairs are returned.
468 If the I{nodelist} key is present, we check that we have
469 connectivity via ssh with the target nodes (and check the hostname
472 If the I{node-net-test} key is present, we check that we have
473 connectivity to the given nodes via both primary IP and, if
474 applicable, secondary IPs.
477 @param what: a dictionary of things to check:
478 - filelist: list of files for which to compute checksums
479 - nodelist: list of nodes we should check ssh communication with
480 - node-net-test: list of nodes we should check node daemon port
482 - hypervisor: list with hypervisors to run the verify for
484 @return: a dictionary with the same keys as the input dict, and
485 values representing the result of the checks
489 my_name = utils.HostInfo().name
490 port = utils.GetDaemonPort(constants.NODED)
492 if constants.NV_HYPERVISOR in what:
493 result[constants.NV_HYPERVISOR] = tmp = {}
494 for hv_name in what[constants.NV_HYPERVISOR]:
496 val = hypervisor.GetHypervisor(hv_name).Verify()
497 except errors.HypervisorError, err:
498 val = "Error while checking hypervisor: %s" % str(err)
501 if constants.NV_FILELIST in what:
502 result[constants.NV_FILELIST] = utils.FingerprintFiles(
503 what[constants.NV_FILELIST])
505 if constants.NV_NODELIST in what:
506 result[constants.NV_NODELIST] = tmp = {}
507 random.shuffle(what[constants.NV_NODELIST])
508 for node in what[constants.NV_NODELIST]:
509 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
513 if constants.NV_NODENETTEST in what:
514 result[constants.NV_NODENETTEST] = tmp = {}
515 my_pip = my_sip = None
516 for name, pip, sip in what[constants.NV_NODENETTEST]:
522 tmp[my_name] = ("Can't find my own primary/secondary IP"
525 for name, pip, sip in what[constants.NV_NODENETTEST]:
527 if not utils.TcpPing(pip, port, source=my_pip):
528 fail.append("primary")
530 if not utils.TcpPing(sip, port, source=my_sip):
531 fail.append("secondary")
533 tmp[name] = ("failure using the %s interface(s)" %
536 if constants.NV_MASTERIP in what:
537 # FIXME: add checks on incoming data structures (here and in the
538 # rest of the function)
539 master_name, master_ip = what[constants.NV_MASTERIP]
540 if master_name == my_name:
541 source = constants.LOCALHOST_IP_ADDRESS
544 result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
547 if constants.NV_LVLIST in what:
549 val = GetVolumeList(what[constants.NV_LVLIST])
552 result[constants.NV_LVLIST] = val
554 if constants.NV_INSTANCELIST in what:
555 # GetInstanceList can fail
557 val = GetInstanceList(what[constants.NV_INSTANCELIST])
560 result[constants.NV_INSTANCELIST] = val
562 if constants.NV_VGLIST in what:
563 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
565 if constants.NV_PVLIST in what:
566 result[constants.NV_PVLIST] = \
567 bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
568 filter_allocatable=False)
570 if constants.NV_VERSION in what:
571 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
572 constants.RELEASE_VERSION)
574 if constants.NV_HVINFO in what:
575 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
576 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
578 if constants.NV_DRBDLIST in what:
580 used_minors = bdev.DRBD8.GetUsedDevs().keys()
581 except errors.BlockDeviceError, err:
582 logging.warning("Can't get used minors list", exc_info=True)
583 used_minors = str(err)
584 result[constants.NV_DRBDLIST] = used_minors
586 if constants.NV_NODESETUP in what:
587 result[constants.NV_NODESETUP] = tmpr = []
588 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
589 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
590 " under /sys, missing required directories /sys/block"
591 " and /sys/class/net")
592 if (not os.path.isdir("/proc/sys") or
593 not os.path.isfile("/proc/sysrq-trigger")):
594 tmpr.append("The procfs filesystem doesn't seem to be mounted"
595 " under /proc, missing required directory /proc/sys and"
596 " the file /proc/sysrq-trigger")
598 if constants.NV_TIME in what:
599 result[constants.NV_TIME] = utils.SplitTime(time.time())
604 def GetVolumeList(vg_name):
605 """Compute list of logical volumes and their size.
608 @param vg_name: the volume group whose LVs we should list
611 dictionary of all partions (key) with value being a tuple of
612 their size (in MiB), inactive and online status::
614 {'test1': ('20.06', True, True)}
616 in case of errors, a string is returned with the error
622 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
623 "--separator=%s" % sep,
624 "-olv_name,lv_size,lv_attr", vg_name])
626 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
628 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
629 for line in result.stdout.splitlines():
631 match = valid_line_re.match(line)
633 logging.error("Invalid line returned from lvs output: '%s'", line)
635 name, size, attr = match.groups()
636 inactive = attr[4] == '-'
637 online = attr[5] == 'o'
638 virtual = attr[0] == 'v'
640 # we don't want to report such volumes as existing, since they
641 # don't really hold data
643 lvs[name] = (size, inactive, online)
648 def ListVolumeGroups():
649 """List the volume groups and their size.
652 @return: dictionary with keys volume name and values the
656 return utils.ListVolumeGroups()
660 """List all volumes on this node.
664 A list of dictionaries, each having four keys:
665 - name: the logical volume name,
666 - size: the size of the logical volume
667 - dev: the physical device on which the LV lives
668 - vg: the volume group to which it belongs
670 In case of errors, we return an empty list and log the
673 Note that since a logical volume can live on multiple physical
674 volumes, the resulting list might include a logical volume
678 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
680 "--options=lv_name,lv_size,devices,vg_name"])
682 _Fail("Failed to list logical volumes, lvs output: %s",
686 return dev.split('(')[0]
689 return [parse_dev(x) for x in dev.split(",")]
692 line = [v.strip() for v in line]
693 return [{'name': line[0], 'size': line[1],
694 'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
697 for line in result.stdout.splitlines():
698 if line.count('|') >= 3:
699 all_devs.extend(map_line(line.split('|')))
701 logging.warning("Strange line in the output from lvs: '%s'", line)
705 def BridgesExist(bridges_list):
706 """Check if a list of bridges exist on the current node.
709 @return: C{True} if all of them exist, C{False} otherwise
713 for bridge in bridges_list:
714 if not utils.BridgeExists(bridge):
715 missing.append(bridge)
718 _Fail("Missing bridges %s", utils.CommaJoin(missing))
721 def GetInstanceList(hypervisor_list):
722 """Provides a list of instances.
724 @type hypervisor_list: list
725 @param hypervisor_list: the list of hypervisors to query information
728 @return: a list of all running instances on the current node
729 - instance1.example.com
730 - instance2.example.com
734 for hname in hypervisor_list:
736 names = hypervisor.GetHypervisor(hname).ListInstances()
737 results.extend(names)
738 except errors.HypervisorError, err:
739 _Fail("Error enumerating instances (hypervisor %s): %s",
740 hname, err, exc=True)
745 def GetInstanceInfo(instance, hname):
746 """Gives back the information about an instance as a dictionary.
748 @type instance: string
749 @param instance: the instance name
751 @param hname: the hypervisor type of the instance
754 @return: dictionary with the following keys:
755 - memory: memory size of instance (int)
756 - state: xen state of instance (string)
757 - time: cpu time of instance (float)
762 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
763 if iinfo is not None:
764 output['memory'] = iinfo[2]
765 output['state'] = iinfo[4]
766 output['time'] = iinfo[5]
771 def GetInstanceMigratable(instance):
772 """Gives whether an instance can be migrated.
774 @type instance: L{objects.Instance}
775 @param instance: object representing the instance to be checked.
778 @return: tuple of (result, description) where:
779 - result: whether the instance can be migrated or not
780 - description: a description of the issue, if relevant
783 hyper = hypervisor.GetHypervisor(instance.hypervisor)
784 iname = instance.name
785 if iname not in hyper.ListInstances():
786 _Fail("Instance %s is not running", iname)
788 for idx in range(len(instance.disks)):
789 link_name = _GetBlockDevSymlinkPath(iname, idx)
790 if not os.path.islink(link_name):
791 _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
794 def GetAllInstancesInfo(hypervisor_list):
795 """Gather data about all instances.
797 This is the equivalent of L{GetInstanceInfo}, except that it
798 computes data for all instances at once, thus being faster if one
799 needs data about more than one instance.
801 @type hypervisor_list: list
802 @param hypervisor_list: list of hypervisors to query for instance data
805 @return: dictionary of instance: data, with data having the following keys:
806 - memory: memory size of instance (int)
807 - state: xen state of instance (string)
808 - time: cpu time of instance (float)
809 - vcpus: the number of vcpus
814 for hname in hypervisor_list:
815 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
817 for name, _, memory, vcpus, state, times in iinfo:
825 # we only check static parameters, like memory and vcpus,
826 # and not state and time which can change between the
827 # invocations of the different hypervisors
828 for key in 'memory', 'vcpus':
829 if value[key] != output[name][key]:
830 _Fail("Instance %s is running twice"
831 " with different parameters", name)
837 def _InstanceLogName(kind, os_name, instance):
838 """Compute the OS log filename for a given instance and operation.
840 The instance name and os name are passed in as strings since not all
841 operations have these as part of an instance object.
844 @param kind: the operation type (e.g. add, import, etc.)
845 @type os_name: string
846 @param os_name: the os name
847 @type instance: string
848 @param instance: the name of the instance being imported/added/etc.
851 # TODO: Use tempfile.mkstemp to create unique filename
852 base = ("%s-%s-%s-%s.log" %
853 (kind, os_name, instance, utils.TimestampForFilename()))
854 return utils.PathJoin(constants.LOG_OS_DIR, base)
857 def InstanceOsAdd(instance, reinstall, debug):
858 """Add an OS to an instance.
860 @type instance: L{objects.Instance}
861 @param instance: Instance whose OS is to be installed
862 @type reinstall: boolean
863 @param reinstall: whether this is an instance reinstall
865 @param debug: debug level, passed to the OS scripts
869 inst_os = OSFromDisk(instance.os)
871 create_env = OSEnvironment(instance, inst_os, debug)
873 create_env['INSTANCE_REINSTALL'] = "1"
875 logfile = _InstanceLogName("add", instance.os, instance.name)
877 result = utils.RunCmd([inst_os.create_script], env=create_env,
878 cwd=inst_os.path, output=logfile,)
880 logging.error("os create command '%s' returned error: %s, logfile: %s,"
881 " output: %s", result.cmd, result.fail_reason, logfile,
883 lines = [utils.SafeEncode(val)
884 for val in utils.TailFile(logfile, lines=20)]
885 _Fail("OS create script failed (%s), last lines in the"
886 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
889 def RunRenameInstance(instance, old_name, debug):
890 """Run the OS rename script for an instance.
892 @type instance: L{objects.Instance}
893 @param instance: Instance whose OS is to be installed
894 @type old_name: string
895 @param old_name: previous instance name
897 @param debug: debug level, passed to the OS scripts
899 @return: the success of the operation
902 inst_os = OSFromDisk(instance.os)
904 rename_env = OSEnvironment(instance, inst_os, debug)
905 rename_env['OLD_INSTANCE_NAME'] = old_name
907 logfile = _InstanceLogName("rename", instance.os,
908 "%s-%s" % (old_name, instance.name))
910 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
911 cwd=inst_os.path, output=logfile)
914 logging.error("os create command '%s' returned error: %s output: %s",
915 result.cmd, result.fail_reason, result.output)
916 lines = [utils.SafeEncode(val)
917 for val in utils.TailFile(logfile, lines=20)]
918 _Fail("OS rename script failed (%s), last lines in the"
919 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
922 def _GetVGInfo(vg_name):
923 """Get information about the volume group.
926 @param vg_name: the volume group which we query
929 A dictionary with the following keys:
930 - C{vg_size} is the total size of the volume group in MiB
931 - C{vg_free} is the free size of the volume group in MiB
932 - C{pv_count} are the number of physical disks in that VG
934 If an error occurs during gathering of data, we return the same dict
935 with keys all set to None.
938 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
940 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
941 "--nosuffix", "--units=m", "--separator=:", vg_name])
944 logging.error("volume group %s not present", vg_name)
946 valarr = retval.stdout.strip().rstrip(':').split(':')
950 "vg_size": int(round(float(valarr[0]), 0)),
951 "vg_free": int(round(float(valarr[1]), 0)),
952 "pv_count": int(valarr[2]),
954 except (TypeError, ValueError), err:
955 logging.exception("Fail to parse vgs output: %s", err)
957 logging.error("vgs output has the wrong number of fields (expected"
958 " three): %s", str(valarr))
962 def _GetBlockDevSymlinkPath(instance_name, idx):
963 return utils.PathJoin(constants.DISK_LINKS_DIR,
964 "%s:%d" % (instance_name, idx))
967 def _SymlinkBlockDev(instance_name, device_path, idx):
968 """Set up symlinks to a instance's block device.
970 This is an auxiliary function run when an instance is start (on the primary
971 node) or when an instance is migrated (on the target node).
974 @param instance_name: the name of the target instance
975 @param device_path: path of the physical block device, on the node
976 @param idx: the disk index
977 @return: absolute path to the disk's symlink
980 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
982 os.symlink(device_path, link_name)
984 if err.errno == errno.EEXIST:
985 if (not os.path.islink(link_name) or
986 os.readlink(link_name) != device_path):
988 os.symlink(device_path, link_name)
995 def _RemoveBlockDevLinks(instance_name, disks):
996 """Remove the block device symlinks belonging to the given instance.
999 for idx, _ in enumerate(disks):
1000 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1001 if os.path.islink(link_name):
1003 os.remove(link_name)
1005 logging.exception("Can't remove symlink '%s'", link_name)
1008 def _GatherAndLinkBlockDevs(instance):
1009 """Set up an instance's block device(s).
1011 This is run on the primary node at instance startup. The block
1012 devices must be already assembled.
1014 @type instance: L{objects.Instance}
1015 @param instance: the instance whose disks we shoul assemble
1017 @return: list of (disk_object, device_path)
1021 for idx, disk in enumerate(instance.disks):
1022 device = _RecursiveFindBD(disk)
1024 raise errors.BlockDeviceError("Block device '%s' is not set up." %
1028 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1030 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1033 block_devices.append((disk, link_name))
1035 return block_devices
1038 def StartInstance(instance):
1039 """Start an instance.
1041 @type instance: L{objects.Instance}
1042 @param instance: the instance object
1046 running_instances = GetInstanceList([instance.hypervisor])
1048 if instance.name in running_instances:
1049 logging.info("Instance %s already running, not starting", instance.name)
1053 block_devices = _GatherAndLinkBlockDevs(instance)
1054 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1055 hyper.StartInstance(instance, block_devices)
1056 except errors.BlockDeviceError, err:
1057 _Fail("Block device error: %s", err, exc=True)
1058 except errors.HypervisorError, err:
1059 _RemoveBlockDevLinks(instance.name, instance.disks)
1060 _Fail("Hypervisor error: %s", err, exc=True)
1063 def InstanceShutdown(instance, timeout):
1064 """Shut an instance down.
1066 @note: this functions uses polling with a hardcoded timeout.
1068 @type instance: L{objects.Instance}
1069 @param instance: the instance object
1070 @type timeout: integer
1071 @param timeout: maximum timeout for soft shutdown
1075 hv_name = instance.hypervisor
1076 hyper = hypervisor.GetHypervisor(hv_name)
1077 iname = instance.name
1079 if instance.name not in hyper.ListInstances():
1080 logging.info("Instance %s not running, doing nothing", iname)
1085 self.tried_once = False
1088 if iname not in hyper.ListInstances():
1092 hyper.StopInstance(instance, retry=self.tried_once)
1093 except errors.HypervisorError, err:
1094 if iname not in hyper.ListInstances():
1095 # if the instance is no longer existing, consider this a
1096 # success and go to cleanup
1099 _Fail("Failed to stop instance %s: %s", iname, err)
1101 self.tried_once = True
1103 raise utils.RetryAgain()
1106 utils.Retry(_TryShutdown(), 5, timeout)
1107 except utils.RetryTimeout:
1108 # the shutdown did not succeed
1109 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1112 hyper.StopInstance(instance, force=True)
1113 except errors.HypervisorError, err:
1114 if iname in hyper.ListInstances():
1115 # only raise an error if the instance still exists, otherwise
1116 # the error could simply be "instance ... unknown"!
1117 _Fail("Failed to force stop instance %s: %s", iname, err)
1121 if iname in hyper.ListInstances():
1122 _Fail("Could not shutdown instance %s even by destroy", iname)
1125 hyper.CleanupInstance(instance.name)
1126 except errors.HypervisorError, err:
1127 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1129 _RemoveBlockDevLinks(iname, instance.disks)
1132 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1133 """Reboot an instance.
1135 @type instance: L{objects.Instance}
1136 @param instance: the instance object to reboot
1137 @type reboot_type: str
1138 @param reboot_type: the type of reboot, one the following
1140 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1141 instance OS, do not recreate the VM
1142 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1143 restart the VM (at the hypervisor level)
1144 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1145 not accepted here, since that mode is handled differently, in
1146 cmdlib, and translates into full stop and start of the
1147 instance (instead of a call_instance_reboot RPC)
1148 @type shutdown_timeout: integer
1149 @param shutdown_timeout: maximum timeout for soft shutdown
1153 running_instances = GetInstanceList([instance.hypervisor])
1155 if instance.name not in running_instances:
1156 _Fail("Cannot reboot instance %s that is not running", instance.name)
1158 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1161 hyper.RebootInstance(instance)
1162 except errors.HypervisorError, err:
1163 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1164 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1166 InstanceShutdown(instance, shutdown_timeout)
1167 return StartInstance(instance)
1168 except errors.HypervisorError, err:
1169 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1171 _Fail("Invalid reboot_type received: %s", reboot_type)
1174 def MigrationInfo(instance):
1175 """Gather information about an instance to be migrated.
1177 @type instance: L{objects.Instance}
1178 @param instance: the instance definition
1181 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1183 info = hyper.MigrationInfo(instance)
1184 except errors.HypervisorError, err:
1185 _Fail("Failed to fetch migration information: %s", err, exc=True)
1189 def AcceptInstance(instance, info, target):
1190 """Prepare the node to accept an instance.
1192 @type instance: L{objects.Instance}
1193 @param instance: the instance definition
1194 @type info: string/data (opaque)
1195 @param info: migration information, from the source node
1196 @type target: string
1197 @param target: target host (usually ip), on this node
1200 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1202 hyper.AcceptInstance(instance, info, target)
1203 except errors.HypervisorError, err:
1204 _Fail("Failed to accept instance: %s", err, exc=True)
1207 def FinalizeMigration(instance, info, success):
1208 """Finalize any preparation to accept an instance.
1210 @type instance: L{objects.Instance}
1211 @param instance: the instance definition
1212 @type info: string/data (opaque)
1213 @param info: migration information, from the source node
1214 @type success: boolean
1215 @param success: whether the migration was a success or a failure
1218 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1220 hyper.FinalizeMigration(instance, info, success)
1221 except errors.HypervisorError, err:
1222 _Fail("Failed to finalize migration: %s", err, exc=True)
1225 def MigrateInstance(instance, target, live):
1226 """Migrates an instance to another node.
1228 @type instance: L{objects.Instance}
1229 @param instance: the instance definition
1230 @type target: string
1231 @param target: the target node name
1233 @param live: whether the migration should be done live or not (the
1234 interpretation of this parameter is left to the hypervisor)
1236 @return: a tuple of (success, msg) where:
1237 - succes is a boolean denoting the success/failure of the operation
1238 - msg is a string with details in case of failure
1241 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1244 hyper.MigrateInstance(instance, target, live)
1245 except errors.HypervisorError, err:
1246 _Fail("Failed to migrate instance: %s", err, exc=True)
1249 def BlockdevCreate(disk, size, owner, on_primary, info):
1250 """Creates a block device for an instance.
1252 @type disk: L{objects.Disk}
1253 @param disk: the object describing the disk we should create
1255 @param size: the size of the physical underlying device, in MiB
1257 @param owner: the name of the instance for which disk is created,
1258 used for device cache data
1259 @type on_primary: boolean
1260 @param on_primary: indicates if it is the primary node or not
1262 @param info: string that will be sent to the physical device
1263 creation, used for example to set (LVM) tags on LVs
1265 @return: the new unique_id of the device (this can sometime be
1266 computed only after creation), or None. On secondary nodes,
1267 it's not required to return anything.
1270 # TODO: remove the obsolete 'size' argument
1271 # pylint: disable-msg=W0613
1274 for child in disk.children:
1276 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1277 except errors.BlockDeviceError, err:
1278 _Fail("Can't assemble device %s: %s", child, err)
1279 if on_primary or disk.AssembleOnSecondary():
1280 # we need the children open in case the device itself has to
1283 # pylint: disable-msg=E1103
1285 except errors.BlockDeviceError, err:
1286 _Fail("Can't make child '%s' read-write: %s", child, err)
1290 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1291 except errors.BlockDeviceError, err:
1292 _Fail("Can't create block device: %s", err)
1294 if on_primary or disk.AssembleOnSecondary():
1297 except errors.BlockDeviceError, err:
1298 _Fail("Can't assemble device after creation, unusual event: %s", err)
1299 device.SetSyncSpeed(constants.SYNC_SPEED)
1300 if on_primary or disk.OpenOnSecondary():
1302 device.Open(force=True)
1303 except errors.BlockDeviceError, err:
1304 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1305 DevCacheManager.UpdateCache(device.dev_path, owner,
1306 on_primary, disk.iv_name)
1308 device.SetInfo(info)
1310 return device.unique_id
1313 def BlockdevRemove(disk):
1314 """Remove a block device.
1316 @note: This is intended to be called recursively.
1318 @type disk: L{objects.Disk}
1319 @param disk: the disk object we should remove
1321 @return: the success of the operation
1326 rdev = _RecursiveFindBD(disk)
1327 except errors.BlockDeviceError, err:
1328 # probably can't attach
1329 logging.info("Can't attach to device %s in remove", disk)
1331 if rdev is not None:
1332 r_path = rdev.dev_path
1335 except errors.BlockDeviceError, err:
1336 msgs.append(str(err))
1338 DevCacheManager.RemoveCache(r_path)
1341 for child in disk.children:
1343 BlockdevRemove(child)
1344 except RPCFail, err:
1345 msgs.append(str(err))
1348 _Fail("; ".join(msgs))
1351 def _RecursiveAssembleBD(disk, owner, as_primary):
1352 """Activate a block device for an instance.
1354 This is run on the primary and secondary nodes for an instance.
1356 @note: this function is called recursively.
1358 @type disk: L{objects.Disk}
1359 @param disk: the disk we try to assemble
1361 @param owner: the name of the instance which owns the disk
1362 @type as_primary: boolean
1363 @param as_primary: if we should make the block device
1366 @return: the assembled device or None (in case no device
1368 @raise errors.BlockDeviceError: in case there is an error
1369 during the activation of the children or the device
1375 mcn = disk.ChildrenNeeded()
1377 mcn = 0 # max number of Nones allowed
1379 mcn = len(disk.children) - mcn # max number of Nones
1380 for chld_disk in disk.children:
1382 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1383 except errors.BlockDeviceError, err:
1384 if children.count(None) >= mcn:
1387 logging.error("Error in child activation (but continuing): %s",
1389 children.append(cdev)
1391 if as_primary or disk.AssembleOnSecondary():
1392 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1393 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1395 if as_primary or disk.OpenOnSecondary():
1397 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1398 as_primary, disk.iv_name)
1405 def BlockdevAssemble(disk, owner, as_primary):
1406 """Activate a block device for an instance.
1408 This is a wrapper over _RecursiveAssembleBD.
1410 @rtype: str or boolean
1411 @return: a C{/dev/...} path for primary nodes, and
1412 C{True} for secondary nodes
1416 result = _RecursiveAssembleBD(disk, owner, as_primary)
1417 if isinstance(result, bdev.BlockDev):
1418 # pylint: disable-msg=E1103
1419 result = result.dev_path
1420 except errors.BlockDeviceError, err:
1421 _Fail("Error while assembling disk: %s", err, exc=True)
1426 def BlockdevShutdown(disk):
1427 """Shut down a block device.
1429 First, if the device is assembled (Attach() is successful), then
1430 the device is shutdown. Then the children of the device are
1433 This function is called recursively. Note that we don't cache the
1434 children or such, as oppossed to assemble, shutdown of different
1435 devices doesn't require that the upper device was active.
1437 @type disk: L{objects.Disk}
1438 @param disk: the description of the disk we should
1444 r_dev = _RecursiveFindBD(disk)
1445 if r_dev is not None:
1446 r_path = r_dev.dev_path
1449 DevCacheManager.RemoveCache(r_path)
1450 except errors.BlockDeviceError, err:
1451 msgs.append(str(err))
1454 for child in disk.children:
1456 BlockdevShutdown(child)
1457 except RPCFail, err:
1458 msgs.append(str(err))
1461 _Fail("; ".join(msgs))
1464 def BlockdevAddchildren(parent_cdev, new_cdevs):
1465 """Extend a mirrored block device.
1467 @type parent_cdev: L{objects.Disk}
1468 @param parent_cdev: the disk to which we should add children
1469 @type new_cdevs: list of L{objects.Disk}
1470 @param new_cdevs: the list of children which we should add
1474 parent_bdev = _RecursiveFindBD(parent_cdev)
1475 if parent_bdev is None:
1476 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1477 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1478 if new_bdevs.count(None) > 0:
1479 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1480 parent_bdev.AddChildren(new_bdevs)
1483 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1484 """Shrink a mirrored block device.
1486 @type parent_cdev: L{objects.Disk}
1487 @param parent_cdev: the disk from which we should remove children
1488 @type new_cdevs: list of L{objects.Disk}
1489 @param new_cdevs: the list of children which we should remove
1493 parent_bdev = _RecursiveFindBD(parent_cdev)
1494 if parent_bdev is None:
1495 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1497 for disk in new_cdevs:
1498 rpath = disk.StaticDevPath()
1500 bd = _RecursiveFindBD(disk)
1502 _Fail("Can't find device %s while removing children", disk)
1504 devs.append(bd.dev_path)
1506 if not utils.IsNormAbsPath(rpath):
1507 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1509 parent_bdev.RemoveChildren(devs)
1512 def BlockdevGetmirrorstatus(disks):
1513 """Get the mirroring status of a list of devices.
1515 @type disks: list of L{objects.Disk}
1516 @param disks: the list of disks which we should query
1519 a list of (mirror_done, estimated_time) tuples, which
1520 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1521 @raise errors.BlockDeviceError: if any of the disks cannot be
1527 rbd = _RecursiveFindBD(dsk)
1529 _Fail("Can't find device %s", dsk)
1531 stats.append(rbd.CombinedSyncStatus())
1536 def _RecursiveFindBD(disk):
1537 """Check if a device is activated.
1539 If so, return information about the real device.
1541 @type disk: L{objects.Disk}
1542 @param disk: the disk object we need to find
1544 @return: None if the device can't be found,
1545 otherwise the device instance
1550 for chdisk in disk.children:
1551 children.append(_RecursiveFindBD(chdisk))
1553 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1556 def _OpenRealBD(disk):
1557 """Opens the underlying block device of a disk.
1559 @type disk: L{objects.Disk}
1560 @param disk: the disk object we want to open
1563 real_disk = _RecursiveFindBD(disk)
1564 if real_disk is None:
1565 _Fail("Block device '%s' is not set up", disk)
1572 def BlockdevFind(disk):
1573 """Check if a device is activated.
1575 If it is, return information about the real device.
1577 @type disk: L{objects.Disk}
1578 @param disk: the disk to find
1579 @rtype: None or objects.BlockDevStatus
1580 @return: None if the disk cannot be found, otherwise a the current
1585 rbd = _RecursiveFindBD(disk)
1586 except errors.BlockDeviceError, err:
1587 _Fail("Failed to find device: %s", err, exc=True)
1592 return rbd.GetSyncStatus()
1595 def BlockdevGetsize(disks):
1596 """Computes the size of the given disks.
1598 If a disk is not found, returns None instead.
1600 @type disks: list of L{objects.Disk}
1601 @param disks: the list of disk to compute the size for
1603 @return: list with elements None if the disk cannot be found,
1610 rbd = _RecursiveFindBD(cf)
1611 except errors.BlockDeviceError:
1617 result.append(rbd.GetActualSize())
1621 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1622 """Export a block device to a remote node.
1624 @type disk: L{objects.Disk}
1625 @param disk: the description of the disk to export
1626 @type dest_node: str
1627 @param dest_node: the destination node to export to
1628 @type dest_path: str
1629 @param dest_path: the destination path on the target node
1630 @type cluster_name: str
1631 @param cluster_name: the cluster name, needed for SSH hostalias
1635 real_disk = _OpenRealBD(disk)
1637 # the block size on the read dd is 1MiB to match our units
1638 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1639 "dd if=%s bs=1048576 count=%s",
1640 real_disk.dev_path, str(disk.size))
1642 # we set here a smaller block size as, due to ssh buffering, more
1643 # than 64-128k will mostly ignored; we use nocreat to fail if the
1644 # device is not already there or we pass a wrong path; we use
1645 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1646 # to not buffer too much memory; this means that at best, we flush
1647 # every 64k, which will not be very fast
1648 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1649 " oflag=dsync", dest_path)
1651 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1652 constants.GANETI_RUNAS,
1655 # all commands have been checked, so we're safe to combine them
1656 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1658 result = utils.RunCmd(["bash", "-c", command])
1661 _Fail("Disk copy command '%s' returned error: %s"
1662 " output: %s", command, result.fail_reason, result.output)
1665 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1666 """Write a file to the filesystem.
1668 This allows the master to overwrite(!) a file. It will only perform
1669 the operation if the file belongs to a list of configuration files.
1671 @type file_name: str
1672 @param file_name: the target file name
1674 @param data: the new contents of the file
1676 @param mode: the mode to give the file (can be None)
1678 @param uid: the owner of the file (can be -1 for default)
1680 @param gid: the group of the file (can be -1 for default)
1682 @param atime: the atime to set on the file (can be None)
1684 @param mtime: the mtime to set on the file (can be None)
1688 if not os.path.isabs(file_name):
1689 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1691 if file_name not in _ALLOWED_UPLOAD_FILES:
1692 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1695 raw_data = _Decompress(data)
1697 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1698 atime=atime, mtime=mtime)
1701 def WriteSsconfFiles(values):
1702 """Update all ssconf files.
1704 Wrapper around the SimpleStore.WriteFiles.
1707 ssconf.SimpleStore().WriteFiles(values)
1710 def _ErrnoOrStr(err):
1711 """Format an EnvironmentError exception.
1713 If the L{err} argument has an errno attribute, it will be looked up
1714 and converted into a textual C{E...} description. Otherwise the
1715 string representation of the error will be returned.
1717 @type err: L{EnvironmentError}
1718 @param err: the exception to format
1721 if hasattr(err, 'errno'):
1722 detail = errno.errorcode[err.errno]
1728 def _OSOndiskAPIVersion(os_dir):
1729 """Compute and return the API version of a given OS.
1731 This function will try to read the API version of the OS residing in
1732 the 'os_dir' directory.
1735 @param os_dir: the directory in which we should look for the OS
1737 @return: tuple (status, data) with status denoting the validity and
1738 data holding either the vaid versions or an error message
1741 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1744 st = os.stat(api_file)
1745 except EnvironmentError, err:
1746 return False, ("Required file '%s' not found under path %s: %s" %
1747 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1749 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1750 return False, ("File '%s' in %s is not a regular file" %
1751 (constants.OS_API_FILE, os_dir))
1754 api_versions = utils.ReadFile(api_file).splitlines()
1755 except EnvironmentError, err:
1756 return False, ("Error while reading the API version file at %s: %s" %
1757 (api_file, _ErrnoOrStr(err)))
1760 api_versions = [int(version.strip()) for version in api_versions]
1761 except (TypeError, ValueError), err:
1762 return False, ("API version(s) can't be converted to integer: %s" %
1765 return True, api_versions
1768 def DiagnoseOS(top_dirs=None):
1769 """Compute the validity for all OSes.
1771 @type top_dirs: list
1772 @param top_dirs: the list of directories in which to
1773 search (if not given defaults to
1774 L{constants.OS_SEARCH_PATH})
1775 @rtype: list of L{objects.OS}
1776 @return: a list of tuples (name, path, status, diagnose, variants)
1777 for all (potential) OSes under all search paths, where:
1778 - name is the (potential) OS name
1779 - path is the full path to the OS
1780 - status True/False is the validity of the OS
1781 - diagnose is the error message for an invalid OS, otherwise empty
1782 - variants is a list of supported OS variants, if any
1785 if top_dirs is None:
1786 top_dirs = constants.OS_SEARCH_PATH
1789 for dir_name in top_dirs:
1790 if os.path.isdir(dir_name):
1792 f_names = utils.ListVisibleFiles(dir_name)
1793 except EnvironmentError, err:
1794 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1796 for name in f_names:
1797 os_path = utils.PathJoin(dir_name, name)
1798 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1801 variants = os_inst.supported_variants
1805 result.append((name, os_path, status, diagnose, variants))
1810 def _TryOSFromDisk(name, base_dir=None):
1811 """Create an OS instance from disk.
1813 This function will return an OS instance if the given name is a
1816 @type base_dir: string
1817 @keyword base_dir: Base directory containing OS installations.
1818 Defaults to a search in all the OS_SEARCH_PATH dirs.
1820 @return: success and either the OS instance if we find a valid one,
1824 if base_dir is None:
1825 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1827 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1830 return False, "Directory for OS %s not found in search path" % name
1832 status, api_versions = _OSOndiskAPIVersion(os_dir)
1835 return status, api_versions
1837 if not constants.OS_API_VERSIONS.intersection(api_versions):
1838 return False, ("API version mismatch for path '%s': found %s, want %s." %
1839 (os_dir, api_versions, constants.OS_API_VERSIONS))
1841 # OS Files dictionary, we will populate it with the absolute path names
1842 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1844 if max(api_versions) >= constants.OS_API_V15:
1845 os_files[constants.OS_VARIANTS_FILE] = ''
1847 for filename in os_files:
1848 os_files[filename] = utils.PathJoin(os_dir, filename)
1851 st = os.stat(os_files[filename])
1852 except EnvironmentError, err:
1853 return False, ("File '%s' under path '%s' is missing (%s)" %
1854 (filename, os_dir, _ErrnoOrStr(err)))
1856 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1857 return False, ("File '%s' under path '%s' is not a regular file" %
1860 if filename in constants.OS_SCRIPTS:
1861 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1862 return False, ("File '%s' under path '%s' is not executable" %
1866 if constants.OS_VARIANTS_FILE in os_files:
1867 variants_file = os_files[constants.OS_VARIANTS_FILE]
1869 variants = utils.ReadFile(variants_file).splitlines()
1870 except EnvironmentError, err:
1871 return False, ("Error while reading the OS variants file at %s: %s" %
1872 (variants_file, _ErrnoOrStr(err)))
1874 return False, ("No supported os variant found")
1876 os_obj = objects.OS(name=name, path=os_dir,
1877 create_script=os_files[constants.OS_SCRIPT_CREATE],
1878 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1879 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1880 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1881 supported_variants=variants,
1882 api_versions=api_versions)
1886 def OSFromDisk(name, base_dir=None):
1887 """Create an OS instance from disk.
1889 This function will return an OS instance if the given name is a
1890 valid OS name. Otherwise, it will raise an appropriate
1891 L{RPCFail} exception, detailing why this is not a valid OS.
1893 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1894 an exception but returns true/false status data.
1896 @type base_dir: string
1897 @keyword base_dir: Base directory containing OS installations.
1898 Defaults to a search in all the OS_SEARCH_PATH dirs.
1899 @rtype: L{objects.OS}
1900 @return: the OS instance if we find a valid one
1901 @raise RPCFail: if we don't find a valid OS
1904 name_only = name.split("+", 1)[0]
1905 status, payload = _TryOSFromDisk(name_only, base_dir)
1913 def OSEnvironment(instance, inst_os, debug=0):
1914 """Calculate the environment for an os script.
1916 @type instance: L{objects.Instance}
1917 @param instance: target instance for the os script run
1918 @type inst_os: L{objects.OS}
1919 @param inst_os: operating system for which the environment is being built
1920 @type debug: integer
1921 @param debug: debug level (0 or 1, for OS Api 10)
1923 @return: dict of environment variables
1924 @raise errors.BlockDeviceError: if the block device
1930 max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1931 result['OS_API_VERSION'] = '%d' % api_version
1932 result['INSTANCE_NAME'] = instance.name
1933 result['INSTANCE_OS'] = instance.os
1934 result['HYPERVISOR'] = instance.hypervisor
1935 result['DISK_COUNT'] = '%d' % len(instance.disks)
1936 result['NIC_COUNT'] = '%d' % len(instance.nics)
1937 result['DEBUG_LEVEL'] = '%d' % debug
1938 if api_version >= constants.OS_API_V15:
1940 variant = instance.os.split('+', 1)[1]
1942 variant = inst_os.supported_variants[0]
1943 result['OS_VARIANT'] = variant
1944 for idx, disk in enumerate(instance.disks):
1945 real_disk = _OpenRealBD(disk)
1946 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1947 result['DISK_%d_ACCESS' % idx] = disk.mode
1948 if constants.HV_DISK_TYPE in instance.hvparams:
1949 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1950 instance.hvparams[constants.HV_DISK_TYPE]
1951 if disk.dev_type in constants.LDS_BLOCK:
1952 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1953 elif disk.dev_type == constants.LD_FILE:
1954 result['DISK_%d_BACKEND_TYPE' % idx] = \
1955 'file:%s' % disk.physical_id[0]
1956 for idx, nic in enumerate(instance.nics):
1957 result['NIC_%d_MAC' % idx] = nic.mac
1959 result['NIC_%d_IP' % idx] = nic.ip
1960 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1961 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1962 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1963 if nic.nicparams[constants.NIC_LINK]:
1964 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1965 if constants.HV_NIC_TYPE in instance.hvparams:
1966 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1967 instance.hvparams[constants.HV_NIC_TYPE]
1969 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1970 for key, value in source.items():
1971 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1976 def BlockdevGrow(disk, amount):
1977 """Grow a stack of block devices.
1979 This function is called recursively, with the childrens being the
1980 first ones to resize.
1982 @type disk: L{objects.Disk}
1983 @param disk: the disk to be grown
1984 @rtype: (status, result)
1985 @return: a tuple with the status of the operation
1986 (True/False), and the errors message if status
1990 r_dev = _RecursiveFindBD(disk)
1992 _Fail("Cannot find block device %s", disk)
1996 except errors.BlockDeviceError, err:
1997 _Fail("Failed to grow block device: %s", err, exc=True)
2000 def BlockdevSnapshot(disk):
2001 """Create a snapshot copy of a block device.
2003 This function is called recursively, and the snapshot is actually created
2004 just for the leaf lvm backend device.
2006 @type disk: L{objects.Disk}
2007 @param disk: the disk to be snapshotted
2009 @return: snapshot disk path
2012 if disk.dev_type == constants.LD_DRBD8:
2013 if not disk.children:
2014 _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2016 return BlockdevSnapshot(disk.children[0])
2017 elif disk.dev_type == constants.LD_LV:
2018 r_dev = _RecursiveFindBD(disk)
2019 if r_dev is not None:
2020 # FIXME: choose a saner value for the snapshot size
2021 # let's stay on the safe side and ask for the full size, for now
2022 return r_dev.Snapshot(disk.size)
2024 _Fail("Cannot find block device %s", disk)
2026 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2027 disk.unique_id, disk.dev_type)
2030 def FinalizeExport(instance, snap_disks):
2031 """Write out the export configuration information.
2033 @type instance: L{objects.Instance}
2034 @param instance: the instance which we export, used for
2035 saving configuration
2036 @type snap_disks: list of L{objects.Disk}
2037 @param snap_disks: list of snapshot block devices, which
2038 will be used to get the actual name of the dump file
2043 destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2044 finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2046 config = objects.SerializableConfigParser()
2048 config.add_section(constants.INISECT_EXP)
2049 config.set(constants.INISECT_EXP, 'version', '0')
2050 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2051 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2052 config.set(constants.INISECT_EXP, 'os', instance.os)
2053 config.set(constants.INISECT_EXP, 'compression', 'gzip')
2055 config.add_section(constants.INISECT_INS)
2056 config.set(constants.INISECT_INS, 'name', instance.name)
2057 config.set(constants.INISECT_INS, 'memory', '%d' %
2058 instance.beparams[constants.BE_MEMORY])
2059 config.set(constants.INISECT_INS, 'vcpus', '%d' %
2060 instance.beparams[constants.BE_VCPUS])
2061 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2062 config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2065 for nic_count, nic in enumerate(instance.nics):
2067 config.set(constants.INISECT_INS, 'nic%d_mac' %
2068 nic_count, '%s' % nic.mac)
2069 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2070 for param in constants.NICS_PARAMETER_TYPES:
2071 config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2072 '%s' % nic.nicparams.get(param, None))
2073 # TODO: redundant: on load can read nics until it doesn't exist
2074 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2077 for disk_count, disk in enumerate(snap_disks):
2080 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2081 ('%s' % disk.iv_name))
2082 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2083 ('%s' % disk.physical_id[1]))
2084 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2087 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2089 # New-style hypervisor/backend parameters
2091 config.add_section(constants.INISECT_HYP)
2092 for name, value in instance.hvparams.items():
2093 if name not in constants.HVC_GLOBALS:
2094 config.set(constants.INISECT_HYP, name, str(value))
2096 config.add_section(constants.INISECT_BEP)
2097 for name, value in instance.beparams.items():
2098 config.set(constants.INISECT_BEP, name, str(value))
2100 utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2101 data=config.Dumps())
2102 shutil.rmtree(finaldestdir, ignore_errors=True)
2103 shutil.move(destdir, finaldestdir)
2106 def ExportInfo(dest):
2107 """Get export configuration information.
2110 @param dest: directory containing the export
2112 @rtype: L{objects.SerializableConfigParser}
2113 @return: a serializable config file containing the
2117 cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2119 config = objects.SerializableConfigParser()
2122 if (not config.has_section(constants.INISECT_EXP) or
2123 not config.has_section(constants.INISECT_INS)):
2124 _Fail("Export info file doesn't have the required fields")
2126 return config.Dumps()
2130 """Return a list of exports currently available on this machine.
2133 @return: list of the exports
2136 if os.path.isdir(constants.EXPORT_DIR):
2137 return utils.ListVisibleFiles(constants.EXPORT_DIR)
2139 _Fail("No exports directory")
2142 def RemoveExport(export):
2143 """Remove an existing export from the node.
2146 @param export: the name of the export to remove
2150 target = utils.PathJoin(constants.EXPORT_DIR, export)
2153 shutil.rmtree(target)
2154 except EnvironmentError, err:
2155 _Fail("Error while removing the export: %s", err, exc=True)
2158 def BlockdevRename(devlist):
2159 """Rename a list of block devices.
2161 @type devlist: list of tuples
2162 @param devlist: list of tuples of the form (disk,
2163 new_logical_id, new_physical_id); disk is an
2164 L{objects.Disk} object describing the current disk,
2165 and new logical_id/physical_id is the name we
2168 @return: True if all renames succeeded, False otherwise
2173 for disk, unique_id in devlist:
2174 dev = _RecursiveFindBD(disk)
2176 msgs.append("Can't find device %s in rename" % str(disk))
2180 old_rpath = dev.dev_path
2181 dev.Rename(unique_id)
2182 new_rpath = dev.dev_path
2183 if old_rpath != new_rpath:
2184 DevCacheManager.RemoveCache(old_rpath)
2185 # FIXME: we should add the new cache information here, like:
2186 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2187 # but we don't have the owner here - maybe parse from existing
2188 # cache? for now, we only lose lvm data when we rename, which
2189 # is less critical than DRBD or MD
2190 except errors.BlockDeviceError, err:
2191 msgs.append("Can't rename device '%s' to '%s': %s" %
2192 (dev, unique_id, err))
2193 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2196 _Fail("; ".join(msgs))
2199 def _TransformFileStorageDir(file_storage_dir):
2200 """Checks whether given file_storage_dir is valid.
2202 Checks wheter the given file_storage_dir is within the cluster-wide
2203 default file_storage_dir stored in SimpleStore. Only paths under that
2204 directory are allowed.
2206 @type file_storage_dir: str
2207 @param file_storage_dir: the path to check
2209 @return: the normalized path if valid, None otherwise
2212 if not constants.ENABLE_FILE_STORAGE:
2213 _Fail("File storage disabled at configure time")
2215 file_storage_dir = os.path.normpath(file_storage_dir)
2216 base_file_storage_dir = cfg.GetFileStorageDir()
2217 if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2218 base_file_storage_dir):
2219 _Fail("File storage directory '%s' is not under base file"
2220 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2221 return file_storage_dir
2224 def CreateFileStorageDir(file_storage_dir):
2225 """Create file storage directory.
2227 @type file_storage_dir: str
2228 @param file_storage_dir: directory to create
2231 @return: tuple with first element a boolean indicating wheter dir
2232 creation was successful or not
2235 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2236 if os.path.exists(file_storage_dir):
2237 if not os.path.isdir(file_storage_dir):
2238 _Fail("Specified storage dir '%s' is not a directory",
2242 os.makedirs(file_storage_dir, 0750)
2243 except OSError, err:
2244 _Fail("Cannot create file storage directory '%s': %s",
2245 file_storage_dir, err, exc=True)
2248 def RemoveFileStorageDir(file_storage_dir):
2249 """Remove file storage directory.
2251 Remove it only if it's empty. If not log an error and return.
2253 @type file_storage_dir: str
2254 @param file_storage_dir: the directory we should cleanup
2255 @rtype: tuple (success,)
2256 @return: tuple of one element, C{success}, denoting
2257 whether the operation was successful
2260 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2261 if os.path.exists(file_storage_dir):
2262 if not os.path.isdir(file_storage_dir):
2263 _Fail("Specified Storage directory '%s' is not a directory",
2265 # deletes dir only if empty, otherwise we want to fail the rpc call
2267 os.rmdir(file_storage_dir)
2268 except OSError, err:
2269 _Fail("Cannot remove file storage directory '%s': %s",
2270 file_storage_dir, err)
2273 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2274 """Rename the file storage directory.
2276 @type old_file_storage_dir: str
2277 @param old_file_storage_dir: the current path
2278 @type new_file_storage_dir: str
2279 @param new_file_storage_dir: the name we should rename to
2280 @rtype: tuple (success,)
2281 @return: tuple of one element, C{success}, denoting
2282 whether the operation was successful
2285 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2286 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2287 if not os.path.exists(new_file_storage_dir):
2288 if os.path.isdir(old_file_storage_dir):
2290 os.rename(old_file_storage_dir, new_file_storage_dir)
2291 except OSError, err:
2292 _Fail("Cannot rename '%s' to '%s': %s",
2293 old_file_storage_dir, new_file_storage_dir, err)
2295 _Fail("Specified storage dir '%s' is not a directory",
2296 old_file_storage_dir)
2298 if os.path.exists(old_file_storage_dir):
2299 _Fail("Cannot rename '%s' to '%s': both locations exist",
2300 old_file_storage_dir, new_file_storage_dir)
2303 def _EnsureJobQueueFile(file_name):
2304 """Checks whether the given filename is in the queue directory.
2306 @type file_name: str
2307 @param file_name: the file name we should check
2309 @raises RPCFail: if the file is not valid
2312 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2313 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2316 _Fail("Passed job queue file '%s' does not belong to"
2317 " the queue directory '%s'", file_name, queue_dir)
2320 def JobQueueUpdate(file_name, content):
2321 """Updates a file in the queue directory.
2323 This is just a wrapper over L{utils.WriteFile}, with proper
2326 @type file_name: str
2327 @param file_name: the job file name
2329 @param content: the new job contents
2331 @return: the success of the operation
2334 _EnsureJobQueueFile(file_name)
2336 # Write and replace the file atomically
2337 utils.WriteFile(file_name, data=_Decompress(content))
2340 def JobQueueRename(old, new):
2341 """Renames a job queue file.
2343 This is just a wrapper over os.rename with proper checking.
2346 @param old: the old (actual) file name
2348 @param new: the desired file name
2350 @return: the success of the operation and payload
2353 _EnsureJobQueueFile(old)
2354 _EnsureJobQueueFile(new)
2356 utils.RenameFile(old, new, mkdir=True)
2359 def JobQueueSetDrainFlag(drain_flag):
2360 """Set the drain flag for the queue.
2362 This will set or unset the queue drain flag.
2364 @type drain_flag: boolean
2365 @param drain_flag: if True, will set the drain flag, otherwise reset it.
2367 @return: always True, None
2368 @warning: the function always returns True
2372 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2374 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2377 def BlockdevClose(instance_name, disks):
2378 """Closes the given block devices.
2380 This means they will be switched to secondary mode (in case of
2383 @param instance_name: if the argument is not empty, the symlinks
2384 of this instance will be removed
2385 @type disks: list of L{objects.Disk}
2386 @param disks: the list of disks to be closed
2387 @rtype: tuple (success, message)
2388 @return: a tuple of success and message, where success
2389 indicates the succes of the operation, and message
2390 which will contain the error details in case we
2396 rd = _RecursiveFindBD(cf)
2398 _Fail("Can't find device %s", cf)
2405 except errors.BlockDeviceError, err:
2406 msg.append(str(err))
2408 _Fail("Can't make devices secondary: %s", ",".join(msg))
2411 _RemoveBlockDevLinks(instance_name, disks)
2414 def ValidateHVParams(hvname, hvparams):
2415 """Validates the given hypervisor parameters.
2417 @type hvname: string
2418 @param hvname: the hypervisor name
2419 @type hvparams: dict
2420 @param hvparams: the hypervisor parameters to be validated
2425 hv_type = hypervisor.GetHypervisor(hvname)
2426 hv_type.ValidateParameters(hvparams)
2427 except errors.HypervisorError, err:
2428 _Fail(str(err), log=False)
2432 """Demotes the current node from master candidate role.
2435 # try to ensure we're not the master by mistake
2436 master, myself = ssconf.GetMasterAndMyself()
2437 if master == myself:
2438 _Fail("ssconf status shows I'm the master node, will not demote")
2440 result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2441 if not result.failed:
2442 _Fail("The master daemon is running, will not demote")
2445 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2446 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2447 except EnvironmentError, err:
2448 if err.errno != errno.ENOENT:
2449 _Fail("Error while backing up cluster file: %s", err, exc=True)
2451 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2454 def _GetX509Filenames(cryptodir, name):
2455 """Returns the full paths for the private key and certificate.
2458 return (utils.PathJoin(cryptodir, name),
2459 utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2460 utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2463 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2464 """Creates a new X509 certificate for SSL/TLS.
2467 @param validity: Validity in seconds
2468 @rtype: tuple; (string, string)
2469 @return: Certificate name and public part
2472 (key_pem, cert_pem) = \
2473 utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2474 min(validity, _MAX_SSL_CERT_VALIDITY))
2476 cert_dir = tempfile.mkdtemp(dir=cryptodir,
2477 prefix="x509-%s-" % utils.TimestampForFilename())
2479 name = os.path.basename(cert_dir)
2480 assert len(name) > 5
2482 (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2484 utils.WriteFile(key_file, mode=0400, data=key_pem)
2485 utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2487 # Never return private key as it shouldn't leave the node
2488 return (name, cert_pem)
2490 shutil.rmtree(cert_dir, ignore_errors=True)
2494 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2495 """Removes a X509 certificate.
2498 @param name: Certificate name
2501 (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2503 utils.RemoveFile(key_file)
2504 utils.RemoveFile(cert_file)
2508 except EnvironmentError, err:
2509 _Fail("Cannot remove certificate directory '%s': %s",
2513 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2514 """Returns the command for the requested input/output.
2516 @type instance: L{objects.Instance}
2517 @param instance: The instance object
2518 @param mode: Import/export mode
2519 @param ieio: Input/output type
2520 @param ieargs: Input/output arguments
2523 assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2529 if ieio == constants.IEIO_FILE:
2530 (filename, ) = ieargs
2532 if not utils.IsNormAbsPath(filename):
2533 _Fail("Path '%s' is not normalized or absolute", filename)
2535 directory = os.path.normpath(os.path.dirname(filename))
2537 if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2538 constants.EXPORT_DIR):
2539 _Fail("File '%s' is not under exports directory '%s'",
2540 filename, constants.EXPORT_DIR)
2543 utils.Makedirs(directory, mode=0750)
2545 quoted_filename = utils.ShellQuote(filename)
2547 if mode == constants.IEM_IMPORT:
2548 suffix = "> %s" % quoted_filename
2549 elif mode == constants.IEM_EXPORT:
2550 suffix = "< %s" % quoted_filename
2552 elif ieio == constants.IEIO_RAW_DISK:
2555 real_disk = _OpenRealBD(disk)
2557 if mode == constants.IEM_IMPORT:
2558 # we set here a smaller block size as, due to transport buffering, more
2559 # than 64-128k will mostly ignored; we use nocreat to fail if the device
2560 # is not already there or we pass a wrong path; we use notrunc to no
2561 # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2562 # much memory; this means that at best, we flush every 64k, which will
2564 suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2565 " bs=%s oflag=dsync"),
2569 elif mode == constants.IEM_EXPORT:
2570 # the block size on the read dd is 1MiB to match our units
2571 prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2573 str(1024 * 1024), # 1 MB
2576 elif ieio == constants.IEIO_SCRIPT:
2577 (disk, disk_index, ) = ieargs
2579 assert isinstance(disk_index, (int, long))
2581 real_disk = _OpenRealBD(disk)
2583 inst_os = OSFromDisk(instance.os)
2584 env = OSEnvironment(instance, inst_os)
2586 if mode == constants.IEM_IMPORT:
2587 env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2588 env["IMPORT_INDEX"] = str(disk_index)
2589 script = inst_os.import_script
2591 elif mode == constants.IEM_EXPORT:
2592 env["EXPORT_DEVICE"] = real_disk.dev_path
2593 env["EXPORT_INDEX"] = str(disk_index)
2594 script = inst_os.export_script
2596 # TODO: Pass special environment only to script
2597 script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2599 if mode == constants.IEM_IMPORT:
2600 suffix = "| %s" % script_cmd
2602 elif mode == constants.IEM_EXPORT:
2603 prefix = "%s |" % script_cmd
2606 _Fail("Invalid %s I/O mode %r", mode, ieio)
2608 return (env, prefix, suffix)
2611 def _CreateImportExportStatusDir(prefix):
2612 """Creates status directory for import/export.
2615 return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2617 (prefix, utils.TimestampForFilename())))
2620 def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
2622 """Starts an import or export daemon.
2624 @param mode: Import/output mode
2625 @type key_name: string
2626 @param key_name: RSA key name (None to use cluster certificate)
2628 @param ca: Remote CA in PEM format (None to use cluster certificate)
2630 @param host: Remote host for export (None for import)
2632 @param port: Remote port for export (None for import)
2633 @type instance: L{objects.Instance}
2634 @param instance: Instance object
2635 @param ieio: Input/output type
2636 @param ieioargs: Input/output arguments
2639 if mode == constants.IEM_IMPORT:
2642 if not (host is None and port is None):
2643 _Fail("Can not specify host or port on import")
2645 elif mode == constants.IEM_EXPORT:
2648 if host is None or port is None:
2649 _Fail("Host and port must be specified for an export")
2652 _Fail("Invalid mode %r", mode)
2654 if (key_name is None) ^ (ca is None):
2655 _Fail("Cluster certificate can only be used for both key and CA")
2657 (cmd_env, cmd_prefix, cmd_suffix) = \
2658 _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2660 if key_name is None:
2662 key_path = constants.NODED_CERT_FILE
2663 cert_path = constants.NODED_CERT_FILE
2666 (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2668 assert ca is not None
2670 for i in [key_path, cert_path]:
2671 if not os.path.exists(i):
2672 _Fail("File '%s' does not exist" % i)
2674 status_dir = _CreateImportExportStatusDir(prefix)
2676 status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2677 pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2678 ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2682 ca = utils.ReadFile(constants.NODED_CERT_FILE)
2684 utils.WriteFile(ca_file, data=ca, mode=0400)
2687 constants.IMPORT_EXPORT_DAEMON,
2689 "--key=%s" % key_path,
2690 "--cert=%s" % cert_path,
2691 "--ca=%s" % ca_file,
2695 cmd.append("--host=%s" % host)
2698 cmd.append("--port=%s" % port)
2701 cmd.append("--cmd-prefix=%s" % cmd_prefix)
2704 cmd.append("--cmd-suffix=%s" % cmd_suffix)
2706 logfile = _InstanceLogName(prefix, instance.os, instance.name)
2708 # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2709 # support for receiving a file descriptor for output
2710 utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2713 # The import/export name is simply the status directory name
2714 return os.path.basename(status_dir)
2717 shutil.rmtree(status_dir, ignore_errors=True)
2721 def GetImportExportStatus(names):
2722 """Returns import/export daemon status.
2724 @type names: sequence
2725 @param names: List of names
2726 @rtype: List of dicts
2727 @return: Returns a list of the state of each named import/export or None if a
2728 status couldn't be read
2734 status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2738 data = utils.ReadFile(status_file)
2739 except EnvironmentError, err:
2740 if err.errno != errno.ENOENT:
2748 result.append(serializer.LoadJson(data))
2753 def AbortImportExport(name):
2754 """Sends SIGTERM to a running import/export daemon.
2757 logging.info("Abort import/export %s", name)
2759 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2760 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2763 logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2765 os.kill(pid, signal.SIGTERM)
2768 def CleanupImportExport(name):
2769 """Cleanup after an import or export.
2771 If the import/export daemon is still running it's killed. Afterwards the
2772 whole status directory is removed.
2775 logging.info("Finalizing import/export %s", name)
2777 status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2779 pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2782 logging.info("Import/export %s is still running with PID %s",
2784 utils.KillProcess(pid, waitpid=False)
2786 shutil.rmtree(status_dir, ignore_errors=True)
2789 def _FindDisks(nodes_ip, disks):
2790 """Sets the physical ID on disks and returns the block devices.
2793 # set the correct physical ID
2794 my_name = utils.HostInfo().name
2796 cf.SetPhysicalID(my_name, nodes_ip)
2801 rd = _RecursiveFindBD(cf)
2803 _Fail("Can't find device %s", cf)
2808 def DrbdDisconnectNet(nodes_ip, disks):
2809 """Disconnects the network on a list of drbd devices.
2812 bdevs = _FindDisks(nodes_ip, disks)
2818 except errors.BlockDeviceError, err:
2819 _Fail("Can't change network configuration to standalone mode: %s",
2823 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2824 """Attaches the network on a list of drbd devices.
2827 bdevs = _FindDisks(nodes_ip, disks)
2830 for idx, rd in enumerate(bdevs):
2832 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2833 except EnvironmentError, err:
2834 _Fail("Can't create symlink: %s", err)
2835 # reconnect disks, switch to new master configuration and if
2836 # needed primary mode
2839 rd.AttachNet(multimaster)
2840 except errors.BlockDeviceError, err:
2841 _Fail("Can't change network configuration: %s", err)
2843 # wait until the disks are connected; we need to retry the re-attach
2844 # if the device becomes standalone, as this might happen if the one
2845 # node disconnects and reconnects in a different mode before the
2846 # other node reconnects; in this case, one or both of the nodes will
2847 # decide it has wrong configuration and switch to standalone
2850 all_connected = True
2853 stats = rd.GetProcStatus()
2855 all_connected = (all_connected and
2856 (stats.is_connected or stats.is_in_resync))
2858 if stats.is_standalone:
2859 # peer had different config info and this node became
2860 # standalone, even though this should not happen with the
2861 # new staged way of changing disk configs
2863 rd.AttachNet(multimaster)
2864 except errors.BlockDeviceError, err:
2865 _Fail("Can't change network configuration: %s", err)
2867 if not all_connected:
2868 raise utils.RetryAgain()
2871 # Start with a delay of 100 miliseconds and go up to 5 seconds
2872 utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2873 except utils.RetryTimeout:
2874 _Fail("Timeout in disk reconnecting")
2877 # change to primary mode
2881 except errors.BlockDeviceError, err:
2882 _Fail("Can't change to primary mode: %s", err)
2885 def DrbdWaitSync(nodes_ip, disks):
2886 """Wait until DRBDs have synchronized.
2890 stats = rd.GetProcStatus()
2891 if not (stats.is_connected or stats.is_in_resync):
2892 raise utils.RetryAgain()
2895 bdevs = _FindDisks(nodes_ip, disks)
2901 # poll each second for 15 seconds
2902 stats = utils.Retry(_helper, 1, 15, args=[rd])
2903 except utils.RetryTimeout:
2904 stats = rd.GetProcStatus()
2906 if not (stats.is_connected or stats.is_in_resync):
2907 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2908 alldone = alldone and (not stats.is_in_resync)
2909 if stats.sync_percent is not None:
2910 min_resync = min(min_resync, stats.sync_percent)
2912 return (alldone, min_resync)
2915 def PowercycleNode(hypervisor_type):
2916 """Hard-powercycle the node.
2918 Because we need to return first, and schedule the powercycle in the
2919 background, we won't be able to report failures nicely.
2922 hyper = hypervisor.GetHypervisor(hypervisor_type)
2926 # if we can't fork, we'll pretend that we're in the child process
2929 return "Reboot scheduled in 5 seconds"
2930 # ensure the child is running on ram
2933 except Exception: # pylint: disable-msg=W0703
2936 hyper.PowercycleNode()
2939 class HooksRunner(object):
2942 This class is instantiated on the node side (ganeti-noded) and not
2946 def __init__(self, hooks_base_dir=None):
2947 """Constructor for hooks runner.
2949 @type hooks_base_dir: str or None
2950 @param hooks_base_dir: if not None, this overrides the
2951 L{constants.HOOKS_BASE_DIR} (useful for unittests)
2954 if hooks_base_dir is None:
2955 hooks_base_dir = constants.HOOKS_BASE_DIR
2956 # yeah, _BASE_DIR is not valid for attributes, we use it like a
2958 self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2960 def RunHooks(self, hpath, phase, env):
2961 """Run the scripts in the hooks directory.
2964 @param hpath: the path to the hooks directory which
2967 @param phase: either L{constants.HOOKS_PHASE_PRE} or
2968 L{constants.HOOKS_PHASE_POST}
2970 @param env: dictionary with the environment for the hook
2972 @return: list of 3-element tuples:
2974 - script result, either L{constants.HKR_SUCCESS} or
2975 L{constants.HKR_FAIL}
2976 - output of the script
2978 @raise errors.ProgrammerError: for invalid input
2982 if phase == constants.HOOKS_PHASE_PRE:
2984 elif phase == constants.HOOKS_PHASE_POST:
2987 _Fail("Unknown hooks phase '%s'", phase)
2990 subdir = "%s-%s.d" % (hpath, suffix)
2991 dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2995 if not os.path.isdir(dir_name):
2996 # for non-existing/non-dirs, we simply exit instead of logging a
2997 # warning at every operation
3000 runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3002 for (relname, relstatus, runresult) in runparts_results:
3003 if relstatus == constants.RUNPARTS_SKIP:
3004 rrval = constants.HKR_SKIP
3006 elif relstatus == constants.RUNPARTS_ERR:
3007 rrval = constants.HKR_FAIL
3008 output = "Hook script execution error: %s" % runresult
3009 elif relstatus == constants.RUNPARTS_RUN:
3010 if runresult.failed:
3011 rrval = constants.HKR_FAIL
3013 rrval = constants.HKR_SUCCESS
3014 output = utils.SafeEncode(runresult.output.strip())
3015 results.append(("%s/%s" % (subdir, relname), rrval, output))
3020 class IAllocatorRunner(object):
3021 """IAllocator runner.
3023 This class is instantiated on the node side (ganeti-noded) and not on
3028 def Run(name, idata):
3029 """Run an iallocator script.
3032 @param name: the iallocator script name
3034 @param idata: the allocator input data
3037 @return: two element tuple of:
3039 - either error message or stdout of allocator (for success)
3042 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3044 if alloc_script is None:
3045 _Fail("iallocator module '%s' not found in the search path", name)
3047 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3051 result = utils.RunCmd([alloc_script, fin_name])
3053 _Fail("iallocator module '%s' failed: %s, output '%s'",
3054 name, result.fail_reason, result.output)
3058 return result.stdout
3061 class DevCacheManager(object):
3062 """Simple class for managing a cache of block device information.
3065 _DEV_PREFIX = "/dev/"
3066 _ROOT_DIR = constants.BDEV_CACHE_DIR
3069 def _ConvertPath(cls, dev_path):
3070 """Converts a /dev/name path to the cache file name.
3072 This replaces slashes with underscores and strips the /dev
3073 prefix. It then returns the full path to the cache file.
3076 @param dev_path: the C{/dev/} path name
3078 @return: the converted path name
3081 if dev_path.startswith(cls._DEV_PREFIX):
3082 dev_path = dev_path[len(cls._DEV_PREFIX):]
3083 dev_path = dev_path.replace("/", "_")
3084 fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3088 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3089 """Updates the cache information for a given device.
3092 @param dev_path: the pathname of the device
3094 @param owner: the owner (instance name) of the device
3095 @type on_primary: bool
3096 @param on_primary: whether this is the primary
3099 @param iv_name: the instance-visible name of the
3100 device, as in objects.Disk.iv_name
3105 if dev_path is None:
3106 logging.error("DevCacheManager.UpdateCache got a None dev_path")
3108 fpath = cls._ConvertPath(dev_path)
3114 iv_name = "not_visible"
3115 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3117 utils.WriteFile(fpath, data=fdata)
3118 except EnvironmentError, err:
3119 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3122 def RemoveCache(cls, dev_path):
3123 """Remove data for a dev_path.
3125 This is just a wrapper over L{utils.RemoveFile} with a converted
3126 path name and logging.
3129 @param dev_path: the pathname of the device
3134 if dev_path is None:
3135 logging.error("DevCacheManager.RemoveCache got a None dev_path")
3137 fpath = cls._ConvertPath(dev_path)
3139 utils.RemoveFile(fpath)
3140 except EnvironmentError, err:
3141 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)