4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
54 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
57 class RPCFail(Exception):
58 """Class denoting RPC failure.
60 Its argument is the error message.
65 def _Fail(msg, *args, **kwargs):
66 """Log an error and the raise an RPCFail exception.
68 This exception is then handled specially in the ganeti daemon and
69 turned into a 'failed' return type. As such, this function is a
70 useful shortcut for logging the error and returning it to the master
74 @param msg: the text of the exception
80 if "log" not in kwargs or kwargs["log"]: # if we should log this error
81 if "exc" in kwargs and kwargs["exc"]:
82 logging.exception(msg)
89 """Simple wrapper to return a SimpleStore.
91 @rtype: L{ssconf.SimpleStore}
92 @return: a SimpleStore instance
95 return ssconf.SimpleStore()
98 def _GetSshRunner(cluster_name):
99 """Simple wrapper to return an SshRunner.
101 @type cluster_name: str
102 @param cluster_name: the cluster name, which is needed
103 by the SshRunner constructor
104 @rtype: L{ssh.SshRunner}
105 @return: an SshRunner instance
108 return ssh.SshRunner(cluster_name)
111 def _Decompress(data):
112 """Unpacks data compressed by the RPC client.
114 @type data: list or tuple
115 @param data: Data sent by RPC client
117 @return: Decompressed data
120 assert isinstance(data, (list, tuple))
121 assert len(data) == 2
122 (encoding, content) = data
123 if encoding == constants.RPC_ENCODING_NONE:
125 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126 return zlib.decompress(base64.b64decode(content))
128 raise AssertionError("Unknown data encoding")
131 def _CleanDirectory(path, exclude=None):
132 """Removes all regular files in a directory.
135 @param path: the directory to clean
137 @param exclude: list of files to be excluded, defaults
141 if not os.path.isdir(path):
146 # Normalize excluded paths
147 exclude = [os.path.normpath(i) for i in exclude]
149 for rel_name in utils.ListVisibleFiles(path):
150 full_name = os.path.normpath(os.path.join(path, rel_name))
151 if full_name in exclude:
153 if os.path.isfile(full_name) and not os.path.islink(full_name):
154 utils.RemoveFile(full_name)
157 def _BuildUploadFileList():
158 """Build the list of allowed upload files.
160 This is abstracted so that it's built only once at module import time.
163 allowed_files = set([
164 constants.CLUSTER_CONF_FILE,
166 constants.SSH_KNOWN_HOSTS_FILE,
167 constants.VNC_PASSWORD_FILE,
168 constants.RAPI_CERT_FILE,
169 constants.RAPI_USERS_FILE,
170 constants.HMAC_CLUSTER_KEY,
173 for hv_name in constants.HYPER_TYPES:
174 hv_class = hypervisor.GetHypervisorClass(hv_name)
175 allowed_files.update(hv_class.GetAncillaryFiles())
177 return frozenset(allowed_files)
180 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
184 """Removes job queue files and archived jobs.
190 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
195 """Returns master information.
197 This is an utility function to compute master information, either
198 for consumption here or from the node daemon.
201 @return: master_netdev, master_ip, master_name
202 @raise RPCFail: in case of errors
207 master_netdev = cfg.GetMasterNetdev()
208 master_ip = cfg.GetMasterIP()
209 master_node = cfg.GetMasterNode()
210 except errors.ConfigurationError, err:
211 _Fail("Cluster configuration incomplete: %s", err, exc=True)
212 return (master_netdev, master_ip, master_node)
215 def StartMaster(start_daemons, no_voting):
216 """Activate local node as master node.
218 The function will always try activate the IP address of the master
219 (unless someone else has it). It will also start the master daemons,
220 based on the start_daemons parameter.
222 @type start_daemons: boolean
223 @param start_daemons: whether to also start the master
224 daemons (ganeti-masterd and ganeti-rapi)
225 @type no_voting: boolean
226 @param no_voting: whether to start ganeti-masterd without a node vote
227 (if start_daemons is True), but still non-interactively
231 # GetMasterInfo will raise an exception if not able to return data
232 master_netdev, master_ip, _ = GetMasterInfo()
235 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236 if utils.OwnIpAddress(master_ip):
237 # we already have the ip:
238 logging.debug("Master IP already configured, doing nothing")
240 msg = "Someone else has the master ip, not activating"
244 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245 "dev", master_netdev, "label",
246 "%s:0" % master_netdev])
248 msg = "Can't activate master IP: %s" % result.output
252 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253 "-s", master_ip, master_ip])
254 # we'll ignore the exit code of arping
256 # and now start the master and rapi daemons
259 'ganeti-masterd': [],
263 daemons_params['ganeti-masterd'].append('--no-voting')
264 daemons_params['ganeti-masterd'].append('--yes-do-it')
265 for daemon in daemons_params:
267 cmd.extend(daemons_params[daemon])
268 result = utils.RunCmd(cmd)
270 msg = "Can't start daemon %s: %s" % (daemon, result.output)
275 _Fail("; ".join(err_msgs))
278 def StopMaster(stop_daemons):
279 """Deactivate this node as master.
281 The function will always try to deactivate the IP address of the
282 master. It will also stop the master daemons depending on the
283 stop_daemons parameter.
285 @type stop_daemons: boolean
286 @param stop_daemons: whether to also stop the master daemons
287 (ganeti-masterd and ganeti-rapi)
291 # TODO: log and report back to the caller the error failures; we
292 # need to decide in which case we fail the RPC for this
294 # GetMasterInfo will raise an exception if not able to return data
295 master_netdev, master_ip, _ = GetMasterInfo()
297 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298 "dev", master_netdev])
300 logging.error("Can't remove the master IP, error: %s", result.output)
301 # but otherwise ignore the failure
304 # stop/kill the rapi and the master daemon
305 for daemon in constants.RAPI, constants.MASTERD:
306 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
309 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310 """Joins this node to the cluster.
312 This does the following:
313 - updates the hostkeys of the machine (rsa and dsa)
314 - adds the ssh private key to the user
315 - adds the ssh public key to the users' authorized_keys file
318 @param dsa: the DSA private key to write
320 @param dsapub: the DSA public key to write
322 @param rsa: the RSA private key to write
324 @param rsapub: the RSA public key to write
326 @param sshkey: the SSH private key to write
328 @param sshpub: the SSH public key to write
330 @return: the success of the operation
333 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337 for name, content, mode in sshd_keys:
338 utils.WriteFile(name, data=content, mode=mode)
341 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
343 except errors.OpExecError, err:
344 _Fail("Error while processing user ssh files: %s", err, exc=True)
346 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347 utils.WriteFile(name, data=content, mode=0600)
349 utils.AddAuthorizedKey(auth_keys, sshpub)
351 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
355 """Cleans up and remove the current node.
357 This function cleans up and prepares the current node to be removed
360 If processing is successful, then it raises an
361 L{errors.QuitGanetiException} which is used as a special case to
362 shutdown the node daemon.
365 _CleanDirectory(constants.DATA_DIR)
369 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
371 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
373 utils.RemoveFile(priv_key)
374 utils.RemoveFile(pub_key)
375 except errors.OpExecError:
376 logging.exception("Error while processing ssh files")
379 utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380 utils.RemoveFile(constants.RAPI_CERT_FILE)
381 utils.RemoveFile(constants.SSL_CERT_FILE)
383 logging.exception("Error while removing cluster secrets")
385 confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
388 utils.KillProcess(confd_pid, timeout=2)
390 # Raise a custom exception (handled in ganeti-noded)
391 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
394 def GetNodeInfo(vgname, hypervisor_type):
395 """Gives back a hash with different information about the node.
397 @type vgname: C{string}
398 @param vgname: the name of the volume group to ask for disk space information
399 @type hypervisor_type: C{str}
400 @param hypervisor_type: the name of the hypervisor to ask for
403 @return: dictionary with the following keys:
404 - vg_size is the size of the configured volume group in MiB
405 - vg_free is the free size of the volume group in MiB
406 - memory_dom0 is the memory allocated for domain0 in MiB
407 - memory_free is the currently available (free) ram in MiB
408 - memory_total is the total number of ram in MiB
412 vginfo = _GetVGInfo(vgname)
413 outputarray['vg_size'] = vginfo['vg_size']
414 outputarray['vg_free'] = vginfo['vg_free']
416 hyper = hypervisor.GetHypervisor(hypervisor_type)
417 hyp_info = hyper.GetNodeInfo()
418 if hyp_info is not None:
419 outputarray.update(hyp_info)
421 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
426 def VerifyNode(what, cluster_name):
427 """Verify the status of the local node.
429 Based on the input L{what} parameter, various checks are done on the
432 If the I{filelist} key is present, this list of
433 files is checksummed and the file/checksum pairs are returned.
435 If the I{nodelist} key is present, we check that we have
436 connectivity via ssh with the target nodes (and check the hostname
439 If the I{node-net-test} key is present, we check that we have
440 connectivity to the given nodes via both primary IP and, if
441 applicable, secondary IPs.
444 @param what: a dictionary of things to check:
445 - filelist: list of files for which to compute checksums
446 - nodelist: list of nodes we should check ssh communication with
447 - node-net-test: list of nodes we should check node daemon port
449 - hypervisor: list with hypervisors to run the verify for
451 @return: a dictionary with the same keys as the input dict, and
452 values representing the result of the checks
457 if constants.NV_HYPERVISOR in what:
458 result[constants.NV_HYPERVISOR] = tmp = {}
459 for hv_name in what[constants.NV_HYPERVISOR]:
460 tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
462 if constants.NV_FILELIST in what:
463 result[constants.NV_FILELIST] = utils.FingerprintFiles(
464 what[constants.NV_FILELIST])
466 if constants.NV_NODELIST in what:
467 result[constants.NV_NODELIST] = tmp = {}
468 random.shuffle(what[constants.NV_NODELIST])
469 for node in what[constants.NV_NODELIST]:
470 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
474 if constants.NV_NODENETTEST in what:
475 result[constants.NV_NODENETTEST] = tmp = {}
476 my_name = utils.HostInfo().name
477 my_pip = my_sip = None
478 for name, pip, sip in what[constants.NV_NODENETTEST]:
484 tmp[my_name] = ("Can't find my own primary/secondary IP"
487 port = utils.GetDaemonPort(constants.NODED)
488 for name, pip, sip in what[constants.NV_NODENETTEST]:
490 if not utils.TcpPing(pip, port, source=my_pip):
491 fail.append("primary")
493 if not utils.TcpPing(sip, port, source=my_sip):
494 fail.append("secondary")
496 tmp[name] = ("failure using the %s interface(s)" %
499 if constants.NV_LVLIST in what:
500 result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
502 if constants.NV_INSTANCELIST in what:
503 result[constants.NV_INSTANCELIST] = GetInstanceList(
504 what[constants.NV_INSTANCELIST])
506 if constants.NV_VGLIST in what:
507 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
509 if constants.NV_VERSION in what:
510 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511 constants.RELEASE_VERSION)
513 if constants.NV_HVINFO in what:
514 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
517 if constants.NV_DRBDLIST in what:
519 used_minors = bdev.DRBD8.GetUsedDevs().keys()
520 except errors.BlockDeviceError, err:
521 logging.warning("Can't get used minors list", exc_info=True)
522 used_minors = str(err)
523 result[constants.NV_DRBDLIST] = used_minors
528 def GetVolumeList(vg_name):
529 """Compute list of logical volumes and their size.
532 @param vg_name: the volume group whose LVs we should list
535 dictionary of all partions (key) with value being a tuple of
536 their size (in MiB), inactive and online status::
538 {'test1': ('20.06', True, True)}
540 in case of errors, a string is returned with the error
546 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
547 "--separator=%s" % sep,
548 "-olv_name,lv_size,lv_attr", vg_name])
550 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
552 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
553 for line in result.stdout.splitlines():
555 match = valid_line_re.match(line)
557 logging.error("Invalid line returned from lvs output: '%s'", line)
559 name, size, attr = match.groups()
560 inactive = attr[4] == '-'
561 online = attr[5] == 'o'
562 virtual = attr[0] == 'v'
564 # we don't want to report such volumes as existing, since they
565 # don't really hold data
567 lvs[name] = (size, inactive, online)
572 def ListVolumeGroups():
573 """List the volume groups and their size.
576 @return: dictionary with keys volume name and values the
580 return utils.ListVolumeGroups()
584 """List all volumes on this node.
588 A list of dictionaries, each having four keys:
589 - name: the logical volume name,
590 - size: the size of the logical volume
591 - dev: the physical device on which the LV lives
592 - vg: the volume group to which it belongs
594 In case of errors, we return an empty list and log the
597 Note that since a logical volume can live on multiple physical
598 volumes, the resulting list might include a logical volume
602 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
604 "--options=lv_name,lv_size,devices,vg_name"])
606 _Fail("Failed to list logical volumes, lvs output: %s",
611 return dev.split('(')[0]
617 'name': line[0].strip(),
618 'size': line[1].strip(),
619 'dev': parse_dev(line[2].strip()),
620 'vg': line[3].strip(),
623 return [map_line(line.split('|')) for line in result.stdout.splitlines()
624 if line.count('|') >= 3]
627 def BridgesExist(bridges_list):
628 """Check if a list of bridges exist on the current node.
631 @return: C{True} if all of them exist, C{False} otherwise
635 for bridge in bridges_list:
636 if not utils.BridgeExists(bridge):
637 missing.append(bridge)
640 _Fail("Missing bridges %s", ", ".join(missing))
643 def GetInstanceList(hypervisor_list):
644 """Provides a list of instances.
646 @type hypervisor_list: list
647 @param hypervisor_list: the list of hypervisors to query information
650 @return: a list of all running instances on the current node
651 - instance1.example.com
652 - instance2.example.com
656 for hname in hypervisor_list:
658 names = hypervisor.GetHypervisor(hname).ListInstances()
659 results.extend(names)
660 except errors.HypervisorError, err:
661 _Fail("Error enumerating instances (hypervisor %s): %s",
662 hname, err, exc=True)
667 def GetInstanceInfo(instance, hname):
668 """Gives back the information about an instance as a dictionary.
670 @type instance: string
671 @param instance: the instance name
673 @param hname: the hypervisor type of the instance
676 @return: dictionary with the following keys:
677 - memory: memory size of instance (int)
678 - state: xen state of instance (string)
679 - time: cpu time of instance (float)
684 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
685 if iinfo is not None:
686 output['memory'] = iinfo[2]
687 output['state'] = iinfo[4]
688 output['time'] = iinfo[5]
693 def GetInstanceMigratable(instance):
694 """Gives whether an instance can be migrated.
696 @type instance: L{objects.Instance}
697 @param instance: object representing the instance to be checked.
700 @return: tuple of (result, description) where:
701 - result: whether the instance can be migrated or not
702 - description: a description of the issue, if relevant
705 hyper = hypervisor.GetHypervisor(instance.hypervisor)
706 iname = instance.name
707 if iname not in hyper.ListInstances():
708 _Fail("Instance %s is not running", iname)
710 for idx in range(len(instance.disks)):
711 link_name = _GetBlockDevSymlinkPath(iname, idx)
712 if not os.path.islink(link_name):
713 _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
716 def GetAllInstancesInfo(hypervisor_list):
717 """Gather data about all instances.
719 This is the equivalent of L{GetInstanceInfo}, except that it
720 computes data for all instances at once, thus being faster if one
721 needs data about more than one instance.
723 @type hypervisor_list: list
724 @param hypervisor_list: list of hypervisors to query for instance data
727 @return: dictionary of instance: data, with data having the following keys:
728 - memory: memory size of instance (int)
729 - state: xen state of instance (string)
730 - time: cpu time of instance (float)
731 - vcpus: the number of vcpus
736 for hname in hypervisor_list:
737 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
739 for name, _, memory, vcpus, state, times in iinfo:
747 # we only check static parameters, like memory and vcpus,
748 # and not state and time which can change between the
749 # invocations of the different hypervisors
750 for key in 'memory', 'vcpus':
751 if value[key] != output[name][key]:
752 _Fail("Instance %s is running twice"
753 " with different parameters", name)
759 def InstanceOsAdd(instance, reinstall):
760 """Add an OS to an instance.
762 @type instance: L{objects.Instance}
763 @param instance: Instance whose OS is to be installed
764 @type reinstall: boolean
765 @param reinstall: whether this is an instance reinstall
769 inst_os = OSFromDisk(instance.os)
771 create_env = OSEnvironment(instance, inst_os)
773 create_env['INSTANCE_REINSTALL'] = "1"
775 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
776 instance.name, int(time.time()))
778 result = utils.RunCmd([inst_os.create_script], env=create_env,
779 cwd=inst_os.path, output=logfile,)
781 logging.error("os create command '%s' returned error: %s, logfile: %s,"
782 " output: %s", result.cmd, result.fail_reason, logfile,
784 lines = [utils.SafeEncode(val)
785 for val in utils.TailFile(logfile, lines=20)]
786 _Fail("OS create script failed (%s), last lines in the"
787 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
790 def RunRenameInstance(instance, old_name):
791 """Run the OS rename script for an instance.
793 @type instance: L{objects.Instance}
794 @param instance: Instance whose OS is to be installed
795 @type old_name: string
796 @param old_name: previous instance name
798 @return: the success of the operation
801 inst_os = OSFromDisk(instance.os)
803 rename_env = OSEnvironment(instance, inst_os)
804 rename_env['OLD_INSTANCE_NAME'] = old_name
806 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
808 instance.name, int(time.time()))
810 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
811 cwd=inst_os.path, output=logfile)
814 logging.error("os create command '%s' returned error: %s output: %s",
815 result.cmd, result.fail_reason, result.output)
816 lines = [utils.SafeEncode(val)
817 for val in utils.TailFile(logfile, lines=20)]
818 _Fail("OS rename script failed (%s), last lines in the"
819 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
822 def _GetVGInfo(vg_name):
823 """Get information about the volume group.
826 @param vg_name: the volume group which we query
829 A dictionary with the following keys:
830 - C{vg_size} is the total size of the volume group in MiB
831 - C{vg_free} is the free size of the volume group in MiB
832 - C{pv_count} are the number of physical disks in that VG
834 If an error occurs during gathering of data, we return the same dict
835 with keys all set to None.
838 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
840 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
841 "--nosuffix", "--units=m", "--separator=:", vg_name])
844 logging.error("volume group %s not present", vg_name)
846 valarr = retval.stdout.strip().rstrip(':').split(':')
850 "vg_size": int(round(float(valarr[0]), 0)),
851 "vg_free": int(round(float(valarr[1]), 0)),
852 "pv_count": int(valarr[2]),
854 except ValueError, err:
855 logging.exception("Fail to parse vgs output: %s", err)
857 logging.error("vgs output has the wrong number of fields (expected"
858 " three): %s", str(valarr))
862 def _GetBlockDevSymlinkPath(instance_name, idx):
863 return os.path.join(constants.DISK_LINKS_DIR,
864 "%s:%d" % (instance_name, idx))
867 def _SymlinkBlockDev(instance_name, device_path, idx):
868 """Set up symlinks to a instance's block device.
870 This is an auxiliary function run when an instance is start (on the primary
871 node) or when an instance is migrated (on the target node).
874 @param instance_name: the name of the target instance
875 @param device_path: path of the physical block device, on the node
876 @param idx: the disk index
877 @return: absolute path to the disk's symlink
880 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
882 os.symlink(device_path, link_name)
884 if err.errno == errno.EEXIST:
885 if (not os.path.islink(link_name) or
886 os.readlink(link_name) != device_path):
888 os.symlink(device_path, link_name)
895 def _RemoveBlockDevLinks(instance_name, disks):
896 """Remove the block device symlinks belonging to the given instance.
899 for idx, _ in enumerate(disks):
900 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
901 if os.path.islink(link_name):
905 logging.exception("Can't remove symlink '%s'", link_name)
908 def _GatherAndLinkBlockDevs(instance):
909 """Set up an instance's block device(s).
911 This is run on the primary node at instance startup. The block
912 devices must be already assembled.
914 @type instance: L{objects.Instance}
915 @param instance: the instance whose disks we shoul assemble
917 @return: list of (disk_object, device_path)
921 for idx, disk in enumerate(instance.disks):
922 device = _RecursiveFindBD(disk)
924 raise errors.BlockDeviceError("Block device '%s' is not set up." %
928 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
930 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
933 block_devices.append((disk, link_name))
938 def StartInstance(instance):
939 """Start an instance.
941 @type instance: L{objects.Instance}
942 @param instance: the instance object
946 running_instances = GetInstanceList([instance.hypervisor])
948 if instance.name in running_instances:
949 logging.info("Instance %s already running, not starting", instance.name)
953 block_devices = _GatherAndLinkBlockDevs(instance)
954 hyper = hypervisor.GetHypervisor(instance.hypervisor)
955 hyper.StartInstance(instance, block_devices)
956 except errors.BlockDeviceError, err:
957 _Fail("Block device error: %s", err, exc=True)
958 except errors.HypervisorError, err:
959 _RemoveBlockDevLinks(instance.name, instance.disks)
960 _Fail("Hypervisor error: %s", err, exc=True)
963 def InstanceShutdown(instance, timeout):
964 """Shut an instance down.
966 @note: this functions uses polling with a hardcoded timeout.
968 @type instance: L{objects.Instance}
969 @param instance: the instance object
970 @type timeout: integer
971 @param timeout: maximum timeout for soft shutdown
975 hv_name = instance.hypervisor
976 hyper = hypervisor.GetHypervisor(hv_name)
977 running_instances = hyper.ListInstances()
978 iname = instance.name
980 if iname not in running_instances:
981 logging.info("Instance %s not running, doing nothing", iname)
985 end = start + timeout
989 while not tried_once and time.time() < end:
991 hyper.StopInstance(instance, retry=tried_once)
992 except errors.HypervisorError, err:
993 _Fail("Failed to stop instance %s: %s", iname, err)
995 time.sleep(sleep_time)
996 if instance.name not in hyper.ListInstances():
999 # 1.2 behaves particularly good for our case:
1000 # it gives us 10 increasing steps and caps just slightly above 5 seconds
1003 # the shutdown did not succeed
1004 logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1007 hyper.StopInstance(instance, force=True)
1008 except errors.HypervisorError, err:
1009 _Fail("Failed to force stop instance %s: %s", iname, err)
1012 if instance.name in GetInstanceList([hv_name]):
1013 _Fail("Could not shutdown instance %s even by destroy", iname)
1015 _RemoveBlockDevLinks(iname, instance.disks)
1018 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1019 """Reboot an instance.
1021 @type instance: L{objects.Instance}
1022 @param instance: the instance object to reboot
1023 @type reboot_type: str
1024 @param reboot_type: the type of reboot, one the following
1026 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1027 instance OS, do not recreate the VM
1028 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1029 restart the VM (at the hypervisor level)
1030 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1031 not accepted here, since that mode is handled differently, in
1032 cmdlib, and translates into full stop and start of the
1033 instance (instead of a call_instance_reboot RPC)
1034 @type timeout: integer
1035 @param timeout: maximum timeout for soft shutdown
1039 running_instances = GetInstanceList([instance.hypervisor])
1041 if instance.name not in running_instances:
1042 _Fail("Cannot reboot instance %s that is not running", instance.name)
1044 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1045 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1047 hyper.RebootInstance(instance)
1048 except errors.HypervisorError, err:
1049 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1050 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1052 InstanceShutdown(instance, shutdown_timeout)
1053 return StartInstance(instance)
1054 except errors.HypervisorError, err:
1055 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1057 _Fail("Invalid reboot_type received: %s", reboot_type)
1060 def MigrationInfo(instance):
1061 """Gather information about an instance to be migrated.
1063 @type instance: L{objects.Instance}
1064 @param instance: the instance definition
1067 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1069 info = hyper.MigrationInfo(instance)
1070 except errors.HypervisorError, err:
1071 _Fail("Failed to fetch migration information: %s", err, exc=True)
1075 def AcceptInstance(instance, info, target):
1076 """Prepare the node to accept an instance.
1078 @type instance: L{objects.Instance}
1079 @param instance: the instance definition
1080 @type info: string/data (opaque)
1081 @param info: migration information, from the source node
1082 @type target: string
1083 @param target: target host (usually ip), on this node
1086 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1088 hyper.AcceptInstance(instance, info, target)
1089 except errors.HypervisorError, err:
1090 _Fail("Failed to accept instance: %s", err, exc=True)
1093 def FinalizeMigration(instance, info, success):
1094 """Finalize any preparation to accept an instance.
1096 @type instance: L{objects.Instance}
1097 @param instance: the instance definition
1098 @type info: string/data (opaque)
1099 @param info: migration information, from the source node
1100 @type success: boolean
1101 @param success: whether the migration was a success or a failure
1104 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1106 hyper.FinalizeMigration(instance, info, success)
1107 except errors.HypervisorError, err:
1108 _Fail("Failed to finalize migration: %s", err, exc=True)
1111 def MigrateInstance(instance, target, live):
1112 """Migrates an instance to another node.
1114 @type instance: L{objects.Instance}
1115 @param instance: the instance definition
1116 @type target: string
1117 @param target: the target node name
1119 @param live: whether the migration should be done live or not (the
1120 interpretation of this parameter is left to the hypervisor)
1122 @return: a tuple of (success, msg) where:
1123 - succes is a boolean denoting the success/failure of the operation
1124 - msg is a string with details in case of failure
1127 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1130 hyper.MigrateInstance(instance.name, target, live)
1131 except errors.HypervisorError, err:
1132 _Fail("Failed to migrate instance: %s", err, exc=True)
1135 def BlockdevCreate(disk, size, owner, on_primary, info):
1136 """Creates a block device for an instance.
1138 @type disk: L{objects.Disk}
1139 @param disk: the object describing the disk we should create
1141 @param size: the size of the physical underlying device, in MiB
1143 @param owner: the name of the instance for which disk is created,
1144 used for device cache data
1145 @type on_primary: boolean
1146 @param on_primary: indicates if it is the primary node or not
1148 @param info: string that will be sent to the physical device
1149 creation, used for example to set (LVM) tags on LVs
1151 @return: the new unique_id of the device (this can sometime be
1152 computed only after creation), or None. On secondary nodes,
1153 it's not required to return anything.
1158 for child in disk.children:
1160 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1161 except errors.BlockDeviceError, err:
1162 _Fail("Can't assemble device %s: %s", child, err)
1163 if on_primary or disk.AssembleOnSecondary():
1164 # we need the children open in case the device itself has to
1168 except errors.BlockDeviceError, err:
1169 _Fail("Can't make child '%s' read-write: %s", child, err)
1173 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1174 except errors.BlockDeviceError, err:
1175 _Fail("Can't create block device: %s", err)
1177 if on_primary or disk.AssembleOnSecondary():
1180 except errors.BlockDeviceError, err:
1181 _Fail("Can't assemble device after creation, unusual event: %s", err)
1182 device.SetSyncSpeed(constants.SYNC_SPEED)
1183 if on_primary or disk.OpenOnSecondary():
1185 device.Open(force=True)
1186 except errors.BlockDeviceError, err:
1187 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1188 DevCacheManager.UpdateCache(device.dev_path, owner,
1189 on_primary, disk.iv_name)
1191 device.SetInfo(info)
1193 return device.unique_id
1196 def BlockdevRemove(disk):
1197 """Remove a block device.
1199 @note: This is intended to be called recursively.
1201 @type disk: L{objects.Disk}
1202 @param disk: the disk object we should remove
1204 @return: the success of the operation
1209 rdev = _RecursiveFindBD(disk)
1210 except errors.BlockDeviceError, err:
1211 # probably can't attach
1212 logging.info("Can't attach to device %s in remove", disk)
1214 if rdev is not None:
1215 r_path = rdev.dev_path
1218 except errors.BlockDeviceError, err:
1219 msgs.append(str(err))
1221 DevCacheManager.RemoveCache(r_path)
1224 for child in disk.children:
1226 BlockdevRemove(child)
1227 except RPCFail, err:
1228 msgs.append(str(err))
1231 _Fail("; ".join(msgs))
1234 def _RecursiveAssembleBD(disk, owner, as_primary):
1235 """Activate a block device for an instance.
1237 This is run on the primary and secondary nodes for an instance.
1239 @note: this function is called recursively.
1241 @type disk: L{objects.Disk}
1242 @param disk: the disk we try to assemble
1244 @param owner: the name of the instance which owns the disk
1245 @type as_primary: boolean
1246 @param as_primary: if we should make the block device
1249 @return: the assembled device or None (in case no device
1251 @raise errors.BlockDeviceError: in case there is an error
1252 during the activation of the children or the device
1258 mcn = disk.ChildrenNeeded()
1260 mcn = 0 # max number of Nones allowed
1262 mcn = len(disk.children) - mcn # max number of Nones
1263 for chld_disk in disk.children:
1265 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1266 except errors.BlockDeviceError, err:
1267 if children.count(None) >= mcn:
1270 logging.error("Error in child activation (but continuing): %s",
1272 children.append(cdev)
1274 if as_primary or disk.AssembleOnSecondary():
1275 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1276 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1278 if as_primary or disk.OpenOnSecondary():
1280 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1281 as_primary, disk.iv_name)
1288 def BlockdevAssemble(disk, owner, as_primary):
1289 """Activate a block device for an instance.
1291 This is a wrapper over _RecursiveAssembleBD.
1293 @rtype: str or boolean
1294 @return: a C{/dev/...} path for primary nodes, and
1295 C{True} for secondary nodes
1299 result = _RecursiveAssembleBD(disk, owner, as_primary)
1300 if isinstance(result, bdev.BlockDev):
1301 result = result.dev_path
1302 except errors.BlockDeviceError, err:
1303 _Fail("Error while assembling disk: %s", err, exc=True)
1308 def BlockdevShutdown(disk):
1309 """Shut down a block device.
1311 First, if the device is assembled (Attach() is successful), then
1312 the device is shutdown. Then the children of the device are
1315 This function is called recursively. Note that we don't cache the
1316 children or such, as oppossed to assemble, shutdown of different
1317 devices doesn't require that the upper device was active.
1319 @type disk: L{objects.Disk}
1320 @param disk: the description of the disk we should
1326 r_dev = _RecursiveFindBD(disk)
1327 if r_dev is not None:
1328 r_path = r_dev.dev_path
1331 DevCacheManager.RemoveCache(r_path)
1332 except errors.BlockDeviceError, err:
1333 msgs.append(str(err))
1336 for child in disk.children:
1338 BlockdevShutdown(child)
1339 except RPCFail, err:
1340 msgs.append(str(err))
1343 _Fail("; ".join(msgs))
1346 def BlockdevAddchildren(parent_cdev, new_cdevs):
1347 """Extend a mirrored block device.
1349 @type parent_cdev: L{objects.Disk}
1350 @param parent_cdev: the disk to which we should add children
1351 @type new_cdevs: list of L{objects.Disk}
1352 @param new_cdevs: the list of children which we should add
1356 parent_bdev = _RecursiveFindBD(parent_cdev)
1357 if parent_bdev is None:
1358 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1359 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1360 if new_bdevs.count(None) > 0:
1361 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1362 parent_bdev.AddChildren(new_bdevs)
1365 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1366 """Shrink a mirrored block device.
1368 @type parent_cdev: L{objects.Disk}
1369 @param parent_cdev: the disk from which we should remove children
1370 @type new_cdevs: list of L{objects.Disk}
1371 @param new_cdevs: the list of children which we should remove
1375 parent_bdev = _RecursiveFindBD(parent_cdev)
1376 if parent_bdev is None:
1377 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1379 for disk in new_cdevs:
1380 rpath = disk.StaticDevPath()
1382 bd = _RecursiveFindBD(disk)
1384 _Fail("Can't find device %s while removing children", disk)
1386 devs.append(bd.dev_path)
1389 parent_bdev.RemoveChildren(devs)
1392 def BlockdevGetmirrorstatus(disks):
1393 """Get the mirroring status of a list of devices.
1395 @type disks: list of L{objects.Disk}
1396 @param disks: the list of disks which we should query
1399 a list of (mirror_done, estimated_time) tuples, which
1400 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1401 @raise errors.BlockDeviceError: if any of the disks cannot be
1407 rbd = _RecursiveFindBD(dsk)
1409 _Fail("Can't find device %s", dsk)
1411 stats.append(rbd.CombinedSyncStatus())
1416 def _RecursiveFindBD(disk):
1417 """Check if a device is activated.
1419 If so, return information about the real device.
1421 @type disk: L{objects.Disk}
1422 @param disk: the disk object we need to find
1424 @return: None if the device can't be found,
1425 otherwise the device instance
1430 for chdisk in disk.children:
1431 children.append(_RecursiveFindBD(chdisk))
1433 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1436 def BlockdevFind(disk):
1437 """Check if a device is activated.
1439 If it is, return information about the real device.
1441 @type disk: L{objects.Disk}
1442 @param disk: the disk to find
1443 @rtype: None or objects.BlockDevStatus
1444 @return: None if the disk cannot be found, otherwise a the current
1449 rbd = _RecursiveFindBD(disk)
1450 except errors.BlockDeviceError, err:
1451 _Fail("Failed to find device: %s", err, exc=True)
1456 return rbd.GetSyncStatus()
1459 def BlockdevGetsize(disks):
1460 """Computes the size of the given disks.
1462 If a disk is not found, returns None instead.
1464 @type disks: list of L{objects.Disk}
1465 @param disks: the list of disk to compute the size for
1467 @return: list with elements None if the disk cannot be found,
1474 rbd = _RecursiveFindBD(cf)
1475 except errors.BlockDeviceError, err:
1481 result.append(rbd.GetActualSize())
1485 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1486 """Export a block device to a remote node.
1488 @type disk: L{objects.Disk}
1489 @param disk: the description of the disk to export
1490 @type dest_node: str
1491 @param dest_node: the destination node to export to
1492 @type dest_path: str
1493 @param dest_path: the destination path on the target node
1494 @type cluster_name: str
1495 @param cluster_name: the cluster name, needed for SSH hostalias
1499 real_disk = _RecursiveFindBD(disk)
1500 if real_disk is None:
1501 _Fail("Block device '%s' is not set up", disk)
1505 # the block size on the read dd is 1MiB to match our units
1506 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1507 "dd if=%s bs=1048576 count=%s",
1508 real_disk.dev_path, str(disk.size))
1510 # we set here a smaller block size as, due to ssh buffering, more
1511 # than 64-128k will mostly ignored; we use nocreat to fail if the
1512 # device is not already there or we pass a wrong path; we use
1513 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1514 # to not buffer too much memory; this means that at best, we flush
1515 # every 64k, which will not be very fast
1516 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1517 " oflag=dsync", dest_path)
1519 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1520 constants.GANETI_RUNAS,
1523 # all commands have been checked, so we're safe to combine them
1524 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1526 result = utils.RunCmd(["bash", "-c", command])
1529 _Fail("Disk copy command '%s' returned error: %s"
1530 " output: %s", command, result.fail_reason, result.output)
1533 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1534 """Write a file to the filesystem.
1536 This allows the master to overwrite(!) a file. It will only perform
1537 the operation if the file belongs to a list of configuration files.
1539 @type file_name: str
1540 @param file_name: the target file name
1542 @param data: the new contents of the file
1544 @param mode: the mode to give the file (can be None)
1546 @param uid: the owner of the file (can be -1 for default)
1548 @param gid: the group of the file (can be -1 for default)
1550 @param atime: the atime to set on the file (can be None)
1552 @param mtime: the mtime to set on the file (can be None)
1556 if not os.path.isabs(file_name):
1557 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1559 if file_name not in _ALLOWED_UPLOAD_FILES:
1560 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1563 raw_data = _Decompress(data)
1565 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1566 atime=atime, mtime=mtime)
1569 def WriteSsconfFiles(values):
1570 """Update all ssconf files.
1572 Wrapper around the SimpleStore.WriteFiles.
1575 ssconf.SimpleStore().WriteFiles(values)
1578 def _ErrnoOrStr(err):
1579 """Format an EnvironmentError exception.
1581 If the L{err} argument has an errno attribute, it will be looked up
1582 and converted into a textual C{E...} description. Otherwise the
1583 string representation of the error will be returned.
1585 @type err: L{EnvironmentError}
1586 @param err: the exception to format
1589 if hasattr(err, 'errno'):
1590 detail = errno.errorcode[err.errno]
1596 def _OSOndiskAPIVersion(name, os_dir):
1597 """Compute and return the API version of a given OS.
1599 This function will try to read the API version of the OS given by
1600 the 'name' parameter and residing in the 'os_dir' directory.
1603 @param name: the OS name we should look for
1605 @param os_dir: the directory inwhich we should look for the OS
1607 @return: tuple (status, data) with status denoting the validity and
1608 data holding either the vaid versions or an error message
1611 api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1614 st = os.stat(api_file)
1615 except EnvironmentError, err:
1616 return False, ("Required file '%s' not found under path %s: %s" %
1617 (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1619 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1620 return False, ("File '%s' in %s is not a regular file" %
1621 (constants.OS_API_FILE, os_dir))
1624 api_versions = utils.ReadFile(api_file).splitlines()
1625 except EnvironmentError, err:
1626 return False, ("Error while reading the API version file at %s: %s" %
1627 (api_file, _ErrnoOrStr(err)))
1630 api_versions = [int(version.strip()) for version in api_versions]
1631 except (TypeError, ValueError), err:
1632 return False, ("API version(s) can't be converted to integer: %s" %
1635 return True, api_versions
1638 def DiagnoseOS(top_dirs=None):
1639 """Compute the validity for all OSes.
1641 @type top_dirs: list
1642 @param top_dirs: the list of directories in which to
1643 search (if not given defaults to
1644 L{constants.OS_SEARCH_PATH})
1645 @rtype: list of L{objects.OS}
1646 @return: a list of tuples (name, path, status, diagnose, variants)
1647 for all (potential) OSes under all search paths, where:
1648 - name is the (potential) OS name
1649 - path is the full path to the OS
1650 - status True/False is the validity of the OS
1651 - diagnose is the error message for an invalid OS, otherwise empty
1652 - variants is a list of supported OS variants, if any
1655 if top_dirs is None:
1656 top_dirs = constants.OS_SEARCH_PATH
1659 for dir_name in top_dirs:
1660 if os.path.isdir(dir_name):
1662 f_names = utils.ListVisibleFiles(dir_name)
1663 except EnvironmentError, err:
1664 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1666 for name in f_names:
1667 os_path = os.path.sep.join([dir_name, name])
1668 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1671 variants = os_inst.supported_variants
1675 result.append((name, os_path, status, diagnose, variants))
1680 def _TryOSFromDisk(name, base_dir=None):
1681 """Create an OS instance from disk.
1683 This function will return an OS instance if the given name is a
1686 @type base_dir: string
1687 @keyword base_dir: Base directory containing OS installations.
1688 Defaults to a search in all the OS_SEARCH_PATH dirs.
1690 @return: success and either the OS instance if we find a valid one,
1694 if base_dir is None:
1695 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1697 return False, "Directory for OS %s not found in search path" % name
1699 os_dir = os.path.sep.join([base_dir, name])
1701 status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1704 return status, api_versions
1706 if not constants.OS_API_VERSIONS.intersection(api_versions):
1707 return False, ("API version mismatch for path '%s': found %s, want %s." %
1708 (os_dir, api_versions, constants.OS_API_VERSIONS))
1710 # OS Files dictionary, we will populate it with the absolute path names
1711 os_files = dict.fromkeys(constants.OS_SCRIPTS)
1713 if max(api_versions) >= constants.OS_API_V15:
1714 os_files[constants.OS_VARIANTS_FILE] = ''
1716 for name in os_files:
1717 os_files[name] = os.path.sep.join([os_dir, name])
1720 st = os.stat(os_files[name])
1721 except EnvironmentError, err:
1722 return False, ("File '%s' under path '%s' is missing (%s)" %
1723 (name, os_dir, _ErrnoOrStr(err)))
1725 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1726 return False, ("File '%s' under path '%s' is not a regular file" %
1729 if name in constants.OS_SCRIPTS:
1730 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1731 return False, ("File '%s' under path '%s' is not executable" %
1735 if constants.OS_VARIANTS_FILE in os_files:
1736 variants_file = os_files[constants.OS_VARIANTS_FILE]
1738 variants = utils.ReadFile(variants_file).splitlines()
1739 except EnvironmentError, err:
1740 return False, ("Error while reading the OS variants file at %s: %s" %
1741 (variants_file, _ErrnoOrStr(err)))
1743 return False, ("No supported os variant found")
1745 os_obj = objects.OS(name=name, path=os_dir,
1746 create_script=os_files[constants.OS_SCRIPT_CREATE],
1747 export_script=os_files[constants.OS_SCRIPT_EXPORT],
1748 import_script=os_files[constants.OS_SCRIPT_IMPORT],
1749 rename_script=os_files[constants.OS_SCRIPT_RENAME],
1750 supported_variants=variants,
1751 api_versions=api_versions)
1755 def OSFromDisk(name, base_dir=None):
1756 """Create an OS instance from disk.
1758 This function will return an OS instance if the given name is a
1759 valid OS name. Otherwise, it will raise an appropriate
1760 L{RPCFail} exception, detailing why this is not a valid OS.
1762 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1763 an exception but returns true/false status data.
1765 @type base_dir: string
1766 @keyword base_dir: Base directory containing OS installations.
1767 Defaults to a search in all the OS_SEARCH_PATH dirs.
1768 @rtype: L{objects.OS}
1769 @return: the OS instance if we find a valid one
1770 @raise RPCFail: if we don't find a valid OS
1773 name_only = name.split("+", 1)[0]
1774 status, payload = _TryOSFromDisk(name_only, base_dir)
1782 def OSEnvironment(instance, os, debug=0):
1783 """Calculate the environment for an os script.
1785 @type instance: L{objects.Instance}
1786 @param instance: target instance for the os script run
1787 @type os: L{objects.OS}
1788 @param os: operating system for which the environment is being built
1789 @type debug: integer
1790 @param debug: debug level (0 or 1, for OS Api 10)
1792 @return: dict of environment variables
1793 @raise errors.BlockDeviceError: if the block device
1798 api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1799 result['OS_API_VERSION'] = '%d' % api_version
1800 result['INSTANCE_NAME'] = instance.name
1801 result['INSTANCE_OS'] = instance.os
1802 result['HYPERVISOR'] = instance.hypervisor
1803 result['DISK_COUNT'] = '%d' % len(instance.disks)
1804 result['NIC_COUNT'] = '%d' % len(instance.nics)
1805 result['DEBUG_LEVEL'] = '%d' % debug
1806 if api_version >= constants.OS_API_V15:
1808 variant = instance.os.split('+', 1)[1]
1810 variant = os.supported_variants[0]
1811 result['OS_VARIANT'] = variant
1812 for idx, disk in enumerate(instance.disks):
1813 real_disk = _RecursiveFindBD(disk)
1814 if real_disk is None:
1815 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1818 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1819 result['DISK_%d_ACCESS' % idx] = disk.mode
1820 if constants.HV_DISK_TYPE in instance.hvparams:
1821 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1822 instance.hvparams[constants.HV_DISK_TYPE]
1823 if disk.dev_type in constants.LDS_BLOCK:
1824 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1825 elif disk.dev_type == constants.LD_FILE:
1826 result['DISK_%d_BACKEND_TYPE' % idx] = \
1827 'file:%s' % disk.physical_id[0]
1828 for idx, nic in enumerate(instance.nics):
1829 result['NIC_%d_MAC' % idx] = nic.mac
1831 result['NIC_%d_IP' % idx] = nic.ip
1832 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1833 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1834 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1835 if nic.nicparams[constants.NIC_LINK]:
1836 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1837 if constants.HV_NIC_TYPE in instance.hvparams:
1838 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1839 instance.hvparams[constants.HV_NIC_TYPE]
1841 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1842 for key, value in source.items():
1843 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1847 def BlockdevGrow(disk, amount):
1848 """Grow a stack of block devices.
1850 This function is called recursively, with the childrens being the
1851 first ones to resize.
1853 @type disk: L{objects.Disk}
1854 @param disk: the disk to be grown
1855 @rtype: (status, result)
1856 @return: a tuple with the status of the operation
1857 (True/False), and the errors message if status
1861 r_dev = _RecursiveFindBD(disk)
1863 _Fail("Cannot find block device %s", disk)
1867 except errors.BlockDeviceError, err:
1868 _Fail("Failed to grow block device: %s", err, exc=True)
1871 def BlockdevSnapshot(disk):
1872 """Create a snapshot copy of a block device.
1874 This function is called recursively, and the snapshot is actually created
1875 just for the leaf lvm backend device.
1877 @type disk: L{objects.Disk}
1878 @param disk: the disk to be snapshotted
1880 @return: snapshot disk path
1884 if len(disk.children) == 1:
1885 # only one child, let's recurse on it
1886 return BlockdevSnapshot(disk.children[0])
1888 # more than one child, choose one that matches
1889 for child in disk.children:
1890 if child.size == disk.size:
1891 # return implies breaking the loop
1892 return BlockdevSnapshot(child)
1893 elif disk.dev_type == constants.LD_LV:
1894 r_dev = _RecursiveFindBD(disk)
1895 if r_dev is not None:
1896 # let's stay on the safe side and ask for the full size, for now
1897 return r_dev.Snapshot(disk.size)
1899 _Fail("Cannot find block device %s", disk)
1901 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1902 disk.unique_id, disk.dev_type)
1905 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1906 """Export a block device snapshot to a remote node.
1908 @type disk: L{objects.Disk}
1909 @param disk: the description of the disk to export
1910 @type dest_node: str
1911 @param dest_node: the destination node to export to
1912 @type instance: L{objects.Instance}
1913 @param instance: the instance object to whom the disk belongs
1914 @type cluster_name: str
1915 @param cluster_name: the cluster name, needed for SSH hostalias
1917 @param idx: the index of the disk in the instance's disk list,
1918 used to export to the OS scripts environment
1922 inst_os = OSFromDisk(instance.os)
1923 export_env = OSEnvironment(instance, inst_os)
1925 export_script = inst_os.export_script
1927 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1928 instance.name, int(time.time()))
1929 if not os.path.exists(constants.LOG_OS_DIR):
1930 os.mkdir(constants.LOG_OS_DIR, 0750)
1931 real_disk = _RecursiveFindBD(disk)
1932 if real_disk is None:
1933 _Fail("Block device '%s' is not set up", disk)
1937 export_env['EXPORT_DEVICE'] = real_disk.dev_path
1938 export_env['EXPORT_INDEX'] = str(idx)
1940 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1941 destfile = disk.physical_id[1]
1943 # the target command is built out of three individual commands,
1944 # which are joined by pipes; we check each individual command for
1946 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1947 inst_os.path, export_script, logfile)
1951 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1952 destdir, destdir, destfile)
1953 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1954 constants.GANETI_RUNAS,
1957 # all commands have been checked, so we're safe to combine them
1958 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1960 result = utils.RunCmd(["bash", "-c", command], env=export_env)
1963 _Fail("OS snapshot export command '%s' returned error: %s"
1964 " output: %s", command, result.fail_reason, result.output)
1967 def FinalizeExport(instance, snap_disks):
1968 """Write out the export configuration information.
1970 @type instance: L{objects.Instance}
1971 @param instance: the instance which we export, used for
1972 saving configuration
1973 @type snap_disks: list of L{objects.Disk}
1974 @param snap_disks: list of snapshot block devices, which
1975 will be used to get the actual name of the dump file
1980 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1981 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1983 config = objects.SerializableConfigParser()
1985 config.add_section(constants.INISECT_EXP)
1986 config.set(constants.INISECT_EXP, 'version', '0')
1987 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1988 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1989 config.set(constants.INISECT_EXP, 'os', instance.os)
1990 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1992 config.add_section(constants.INISECT_INS)
1993 config.set(constants.INISECT_INS, 'name', instance.name)
1994 config.set(constants.INISECT_INS, 'memory', '%d' %
1995 instance.beparams[constants.BE_MEMORY])
1996 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1997 instance.beparams[constants.BE_VCPUS])
1998 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2001 for nic_count, nic in enumerate(instance.nics):
2003 config.set(constants.INISECT_INS, 'nic%d_mac' %
2004 nic_count, '%s' % nic.mac)
2005 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2006 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2008 # TODO: redundant: on load can read nics until it doesn't exist
2009 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2012 for disk_count, disk in enumerate(snap_disks):
2015 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2016 ('%s' % disk.iv_name))
2017 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2018 ('%s' % disk.physical_id[1]))
2019 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2022 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2024 utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2025 data=config.Dumps())
2026 shutil.rmtree(finaldestdir, True)
2027 shutil.move(destdir, finaldestdir)
2030 def ExportInfo(dest):
2031 """Get export configuration information.
2034 @param dest: directory containing the export
2036 @rtype: L{objects.SerializableConfigParser}
2037 @return: a serializable config file containing the
2041 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2043 config = objects.SerializableConfigParser()
2046 if (not config.has_section(constants.INISECT_EXP) or
2047 not config.has_section(constants.INISECT_INS)):
2048 _Fail("Export info file doesn't have the required fields")
2050 return config.Dumps()
2053 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2054 """Import an os image into an instance.
2056 @type instance: L{objects.Instance}
2057 @param instance: instance to import the disks into
2058 @type src_node: string
2059 @param src_node: source node for the disk images
2060 @type src_images: list of string
2061 @param src_images: absolute paths of the disk images
2062 @rtype: list of boolean
2063 @return: each boolean represent the success of importing the n-th disk
2066 inst_os = OSFromDisk(instance.os)
2067 import_env = OSEnvironment(instance, inst_os)
2068 import_script = inst_os.import_script
2070 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2071 instance.name, int(time.time()))
2072 if not os.path.exists(constants.LOG_OS_DIR):
2073 os.mkdir(constants.LOG_OS_DIR, 0750)
2076 impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2077 import_script, logfile)
2080 for idx, image in enumerate(src_images):
2082 destcmd = utils.BuildShellCmd('cat %s', image)
2083 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2084 constants.GANETI_RUNAS,
2086 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2087 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2088 import_env['IMPORT_INDEX'] = str(idx)
2089 result = utils.RunCmd(command, env=import_env)
2091 logging.error("Disk import command '%s' returned error: %s"
2092 " output: %s", command, result.fail_reason,
2094 final_result.append("error importing disk %d: %s, %s" %
2095 (idx, result.fail_reason, result.output[-100]))
2098 _Fail("; ".join(final_result), log=False)
2102 """Return a list of exports currently available on this machine.
2105 @return: list of the exports
2108 if os.path.isdir(constants.EXPORT_DIR):
2109 return utils.ListVisibleFiles(constants.EXPORT_DIR)
2111 _Fail("No exports directory")
2114 def RemoveExport(export):
2115 """Remove an existing export from the node.
2118 @param export: the name of the export to remove
2122 target = os.path.join(constants.EXPORT_DIR, export)
2125 shutil.rmtree(target)
2126 except EnvironmentError, err:
2127 _Fail("Error while removing the export: %s", err, exc=True)
2130 def BlockdevRename(devlist):
2131 """Rename a list of block devices.
2133 @type devlist: list of tuples
2134 @param devlist: list of tuples of the form (disk,
2135 new_logical_id, new_physical_id); disk is an
2136 L{objects.Disk} object describing the current disk,
2137 and new logical_id/physical_id is the name we
2140 @return: True if all renames succeeded, False otherwise
2145 for disk, unique_id in devlist:
2146 dev = _RecursiveFindBD(disk)
2148 msgs.append("Can't find device %s in rename" % str(disk))
2152 old_rpath = dev.dev_path
2153 dev.Rename(unique_id)
2154 new_rpath = dev.dev_path
2155 if old_rpath != new_rpath:
2156 DevCacheManager.RemoveCache(old_rpath)
2157 # FIXME: we should add the new cache information here, like:
2158 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2159 # but we don't have the owner here - maybe parse from existing
2160 # cache? for now, we only lose lvm data when we rename, which
2161 # is less critical than DRBD or MD
2162 except errors.BlockDeviceError, err:
2163 msgs.append("Can't rename device '%s' to '%s': %s" %
2164 (dev, unique_id, err))
2165 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2168 _Fail("; ".join(msgs))
2171 def _TransformFileStorageDir(file_storage_dir):
2172 """Checks whether given file_storage_dir is valid.
2174 Checks wheter the given file_storage_dir is within the cluster-wide
2175 default file_storage_dir stored in SimpleStore. Only paths under that
2176 directory are allowed.
2178 @type file_storage_dir: str
2179 @param file_storage_dir: the path to check
2181 @return: the normalized path if valid, None otherwise
2185 file_storage_dir = os.path.normpath(file_storage_dir)
2186 base_file_storage_dir = cfg.GetFileStorageDir()
2187 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2188 base_file_storage_dir):
2189 _Fail("File storage directory '%s' is not under base file"
2190 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2191 return file_storage_dir
2194 def CreateFileStorageDir(file_storage_dir):
2195 """Create file storage directory.
2197 @type file_storage_dir: str
2198 @param file_storage_dir: directory to create
2201 @return: tuple with first element a boolean indicating wheter dir
2202 creation was successful or not
2205 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2206 if os.path.exists(file_storage_dir):
2207 if not os.path.isdir(file_storage_dir):
2208 _Fail("Specified storage dir '%s' is not a directory",
2212 os.makedirs(file_storage_dir, 0750)
2213 except OSError, err:
2214 _Fail("Cannot create file storage directory '%s': %s",
2215 file_storage_dir, err, exc=True)
2218 def RemoveFileStorageDir(file_storage_dir):
2219 """Remove file storage directory.
2221 Remove it only if it's empty. If not log an error and return.
2223 @type file_storage_dir: str
2224 @param file_storage_dir: the directory we should cleanup
2225 @rtype: tuple (success,)
2226 @return: tuple of one element, C{success}, denoting
2227 whether the operation was successful
2230 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2231 if os.path.exists(file_storage_dir):
2232 if not os.path.isdir(file_storage_dir):
2233 _Fail("Specified Storage directory '%s' is not a directory",
2235 # deletes dir only if empty, otherwise we want to fail the rpc call
2237 os.rmdir(file_storage_dir)
2238 except OSError, err:
2239 _Fail("Cannot remove file storage directory '%s': %s",
2240 file_storage_dir, err)
2243 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2244 """Rename the file storage directory.
2246 @type old_file_storage_dir: str
2247 @param old_file_storage_dir: the current path
2248 @type new_file_storage_dir: str
2249 @param new_file_storage_dir: the name we should rename to
2250 @rtype: tuple (success,)
2251 @return: tuple of one element, C{success}, denoting
2252 whether the operation was successful
2255 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2256 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2257 if not os.path.exists(new_file_storage_dir):
2258 if os.path.isdir(old_file_storage_dir):
2260 os.rename(old_file_storage_dir, new_file_storage_dir)
2261 except OSError, err:
2262 _Fail("Cannot rename '%s' to '%s': %s",
2263 old_file_storage_dir, new_file_storage_dir, err)
2265 _Fail("Specified storage dir '%s' is not a directory",
2266 old_file_storage_dir)
2268 if os.path.exists(old_file_storage_dir):
2269 _Fail("Cannot rename '%s' to '%s': both locations exist",
2270 old_file_storage_dir, new_file_storage_dir)
2273 def _EnsureJobQueueFile(file_name):
2274 """Checks whether the given filename is in the queue directory.
2276 @type file_name: str
2277 @param file_name: the file name we should check
2279 @raises RPCFail: if the file is not valid
2282 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2283 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2286 _Fail("Passed job queue file '%s' does not belong to"
2287 " the queue directory '%s'", file_name, queue_dir)
2290 def JobQueueUpdate(file_name, content):
2291 """Updates a file in the queue directory.
2293 This is just a wrapper over L{utils.WriteFile}, with proper
2296 @type file_name: str
2297 @param file_name: the job file name
2299 @param content: the new job contents
2301 @return: the success of the operation
2304 _EnsureJobQueueFile(file_name)
2306 # Write and replace the file atomically
2307 utils.WriteFile(file_name, data=_Decompress(content))
2310 def JobQueueRename(old, new):
2311 """Renames a job queue file.
2313 This is just a wrapper over os.rename with proper checking.
2316 @param old: the old (actual) file name
2318 @param new: the desired file name
2320 @return: the success of the operation and payload
2323 _EnsureJobQueueFile(old)
2324 _EnsureJobQueueFile(new)
2326 utils.RenameFile(old, new, mkdir=True)
2329 def JobQueueSetDrainFlag(drain_flag):
2330 """Set the drain flag for the queue.
2332 This will set or unset the queue drain flag.
2334 @type drain_flag: boolean
2335 @param drain_flag: if True, will set the drain flag, otherwise reset it.
2337 @return: always True, None
2338 @warning: the function always returns True
2342 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2344 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2347 def BlockdevClose(instance_name, disks):
2348 """Closes the given block devices.
2350 This means they will be switched to secondary mode (in case of
2353 @param instance_name: if the argument is not empty, the symlinks
2354 of this instance will be removed
2355 @type disks: list of L{objects.Disk}
2356 @param disks: the list of disks to be closed
2357 @rtype: tuple (success, message)
2358 @return: a tuple of success and message, where success
2359 indicates the succes of the operation, and message
2360 which will contain the error details in case we
2366 rd = _RecursiveFindBD(cf)
2368 _Fail("Can't find device %s", cf)
2375 except errors.BlockDeviceError, err:
2376 msg.append(str(err))
2378 _Fail("Can't make devices secondary: %s", ",".join(msg))
2381 _RemoveBlockDevLinks(instance_name, disks)
2384 def ValidateHVParams(hvname, hvparams):
2385 """Validates the given hypervisor parameters.
2387 @type hvname: string
2388 @param hvname: the hypervisor name
2389 @type hvparams: dict
2390 @param hvparams: the hypervisor parameters to be validated
2395 hv_type = hypervisor.GetHypervisor(hvname)
2396 hv_type.ValidateParameters(hvparams)
2397 except errors.HypervisorError, err:
2398 _Fail(str(err), log=False)
2402 """Demotes the current node from master candidate role.
2405 # try to ensure we're not the master by mistake
2406 master, myself = ssconf.GetMasterAndMyself()
2407 if master == myself:
2408 _Fail("ssconf status shows I'm the master node, will not demote")
2409 pid_file = utils.DaemonPidFileName(constants.MASTERD)
2410 if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2411 _Fail("The master daemon is running, will not demote")
2413 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2414 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2415 except EnvironmentError, err:
2416 if err.errno != errno.ENOENT:
2417 _Fail("Error while backing up cluster file: %s", err, exc=True)
2418 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2421 def _FindDisks(nodes_ip, disks):
2422 """Sets the physical ID on disks and returns the block devices.
2425 # set the correct physical ID
2426 my_name = utils.HostInfo().name
2428 cf.SetPhysicalID(my_name, nodes_ip)
2433 rd = _RecursiveFindBD(cf)
2435 _Fail("Can't find device %s", cf)
2440 def DrbdDisconnectNet(nodes_ip, disks):
2441 """Disconnects the network on a list of drbd devices.
2444 bdevs = _FindDisks(nodes_ip, disks)
2450 except errors.BlockDeviceError, err:
2451 _Fail("Can't change network configuration to standalone mode: %s",
2455 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2456 """Attaches the network on a list of drbd devices.
2459 bdevs = _FindDisks(nodes_ip, disks)
2462 for idx, rd in enumerate(bdevs):
2464 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2465 except EnvironmentError, err:
2466 _Fail("Can't create symlink: %s", err)
2467 # reconnect disks, switch to new master configuration and if
2468 # needed primary mode
2471 rd.AttachNet(multimaster)
2472 except errors.BlockDeviceError, err:
2473 _Fail("Can't change network configuration: %s", err)
2474 # wait until the disks are connected; we need to retry the re-attach
2475 # if the device becomes standalone, as this might happen if the one
2476 # node disconnects and reconnects in a different mode before the
2477 # other node reconnects; in this case, one or both of the nodes will
2478 # decide it has wrong configuration and switch to standalone
2479 RECONNECT_TIMEOUT = 2 * 60
2480 sleep_time = 0.100 # start with 100 miliseconds
2481 timeout_limit = time.time() + RECONNECT_TIMEOUT
2482 while time.time() < timeout_limit:
2483 all_connected = True
2485 stats = rd.GetProcStatus()
2486 if not (stats.is_connected or stats.is_in_resync):
2487 all_connected = False
2488 if stats.is_standalone:
2489 # peer had different config info and this node became
2490 # standalone, even though this should not happen with the
2491 # new staged way of changing disk configs
2493 rd.AttachNet(multimaster)
2494 except errors.BlockDeviceError, err:
2495 _Fail("Can't change network configuration: %s", err)
2498 time.sleep(sleep_time)
2499 sleep_time = min(5, sleep_time * 1.5)
2500 if not all_connected:
2501 _Fail("Timeout in disk reconnecting")
2503 # change to primary mode
2507 except errors.BlockDeviceError, err:
2508 _Fail("Can't change to primary mode: %s", err)
2511 def DrbdWaitSync(nodes_ip, disks):
2512 """Wait until DRBDs have synchronized.
2515 bdevs = _FindDisks(nodes_ip, disks)
2520 stats = rd.GetProcStatus()
2521 if not (stats.is_connected or stats.is_in_resync):
2522 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2523 alldone = alldone and (not stats.is_in_resync)
2524 if stats.sync_percent is not None:
2525 min_resync = min(min_resync, stats.sync_percent)
2527 return (alldone, min_resync)
2530 def PowercycleNode(hypervisor_type):
2531 """Hard-powercycle the node.
2533 Because we need to return first, and schedule the powercycle in the
2534 background, we won't be able to report failures nicely.
2537 hyper = hypervisor.GetHypervisor(hypervisor_type)
2541 # if we can't fork, we'll pretend that we're in the child process
2544 return "Reboot scheduled in 5 seconds"
2546 hyper.PowercycleNode()
2549 class HooksRunner(object):
2552 This class is instantiated on the node side (ganeti-noded) and not
2556 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2558 def __init__(self, hooks_base_dir=None):
2559 """Constructor for hooks runner.
2561 @type hooks_base_dir: str or None
2562 @param hooks_base_dir: if not None, this overrides the
2563 L{constants.HOOKS_BASE_DIR} (useful for unittests)
2566 if hooks_base_dir is None:
2567 hooks_base_dir = constants.HOOKS_BASE_DIR
2568 self._BASE_DIR = hooks_base_dir
2571 def ExecHook(script, env):
2572 """Exec one hook script.
2575 @param script: the full path to the script
2577 @param env: the environment with which to exec the script
2578 @rtype: tuple (success, message)
2579 @return: a tuple of success and message, where success
2580 indicates the succes of the operation, and message
2581 which will contain the error details in case we
2585 # exec the process using subprocess and log the output
2588 fdstdin = open("/dev/null", "r")
2589 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2590 stderr=subprocess.STDOUT, close_fds=True,
2591 shell=False, cwd="/", env=env)
2594 output = child.stdout.read(4096)
2595 child.stdout.close()
2596 except EnvironmentError, err:
2597 output += "Hook script error: %s" % str(err)
2601 result = child.wait()
2603 except EnvironmentError, err:
2604 if err.errno == errno.EINTR:
2608 # try not to leak fds
2609 for fd in (fdstdin, ):
2613 except EnvironmentError, err:
2614 # just log the error
2615 #logging.exception("Error while closing fd %s", fd)
2618 return result == 0, utils.SafeEncode(output.strip())
2620 def RunHooks(self, hpath, phase, env):
2621 """Run the scripts in the hooks directory.
2624 @param hpath: the path to the hooks directory which
2627 @param phase: either L{constants.HOOKS_PHASE_PRE} or
2628 L{constants.HOOKS_PHASE_POST}
2630 @param env: dictionary with the environment for the hook
2632 @return: list of 3-element tuples:
2634 - script result, either L{constants.HKR_SUCCESS} or
2635 L{constants.HKR_FAIL}
2636 - output of the script
2638 @raise errors.ProgrammerError: for invalid input
2642 if phase == constants.HOOKS_PHASE_PRE:
2644 elif phase == constants.HOOKS_PHASE_POST:
2647 _Fail("Unknown hooks phase '%s'", phase)
2651 subdir = "%s-%s.d" % (hpath, suffix)
2652 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2654 dir_contents = utils.ListVisibleFiles(dir_name)
2656 # FIXME: must log output in case of failures
2659 # we use the standard python sort order,
2660 # so 00name is the recommended naming scheme
2662 for relname in dir_contents:
2663 fname = os.path.join(dir_name, relname)
2664 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2665 self.RE_MASK.match(relname) is not None):
2666 rrval = constants.HKR_SKIP
2669 result, output = self.ExecHook(fname, env)
2671 rrval = constants.HKR_FAIL
2673 rrval = constants.HKR_SUCCESS
2674 rr.append(("%s/%s" % (subdir, relname), rrval, output))
2679 class IAllocatorRunner(object):
2680 """IAllocator runner.
2682 This class is instantiated on the node side (ganeti-noded) and not on
2686 def Run(self, name, idata):
2687 """Run an iallocator script.
2690 @param name: the iallocator script name
2692 @param idata: the allocator input data
2695 @return: two element tuple of:
2697 - either error message or stdout of allocator (for success)
2700 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2702 if alloc_script is None:
2703 _Fail("iallocator module '%s' not found in the search path", name)
2705 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2709 result = utils.RunCmd([alloc_script, fin_name])
2711 _Fail("iallocator module '%s' failed: %s, output '%s'",
2712 name, result.fail_reason, result.output)
2716 return result.stdout
2719 class DevCacheManager(object):
2720 """Simple class for managing a cache of block device information.
2723 _DEV_PREFIX = "/dev/"
2724 _ROOT_DIR = constants.BDEV_CACHE_DIR
2727 def _ConvertPath(cls, dev_path):
2728 """Converts a /dev/name path to the cache file name.
2730 This replaces slashes with underscores and strips the /dev
2731 prefix. It then returns the full path to the cache file.
2734 @param dev_path: the C{/dev/} path name
2736 @return: the converted path name
2739 if dev_path.startswith(cls._DEV_PREFIX):
2740 dev_path = dev_path[len(cls._DEV_PREFIX):]
2741 dev_path = dev_path.replace("/", "_")
2742 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2746 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2747 """Updates the cache information for a given device.
2750 @param dev_path: the pathname of the device
2752 @param owner: the owner (instance name) of the device
2753 @type on_primary: bool
2754 @param on_primary: whether this is the primary
2757 @param iv_name: the instance-visible name of the
2758 device, as in objects.Disk.iv_name
2763 if dev_path is None:
2764 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2766 fpath = cls._ConvertPath(dev_path)
2772 iv_name = "not_visible"
2773 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2775 utils.WriteFile(fpath, data=fdata)
2776 except EnvironmentError, err:
2777 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2780 def RemoveCache(cls, dev_path):
2781 """Remove data for a dev_path.
2783 This is just a wrapper over L{utils.RemoveFile} with a converted
2784 path name and logging.
2787 @param dev_path: the pathname of the device
2792 if dev_path is None:
2793 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2795 fpath = cls._ConvertPath(dev_path)
2797 utils.RemoveFile(fpath)
2798 except EnvironmentError, err:
2799 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)