4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25 the L{UploadFile} function
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
54 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
57 class RPCFail(Exception):
58 """Class denoting RPC failure.
60 Its argument is the error message.
65 def _Fail(msg, *args, **kwargs):
66 """Log an error and the raise an RPCFail exception.
68 This exception is then handled specially in the ganeti daemon and
69 turned into a 'failed' return type. As such, this function is a
70 useful shortcut for logging the error and returning it to the master
74 @param msg: the text of the exception
80 if "log" not in kwargs or kwargs["log"]: # if we should log this error
81 if "exc" in kwargs and kwargs["exc"]:
82 logging.exception(msg)
89 """Simple wrapper to return a SimpleStore.
91 @rtype: L{ssconf.SimpleStore}
92 @return: a SimpleStore instance
95 return ssconf.SimpleStore()
98 def _GetSshRunner(cluster_name):
99 """Simple wrapper to return an SshRunner.
101 @type cluster_name: str
102 @param cluster_name: the cluster name, which is needed
103 by the SshRunner constructor
104 @rtype: L{ssh.SshRunner}
105 @return: an SshRunner instance
108 return ssh.SshRunner(cluster_name)
111 def _Decompress(data):
112 """Unpacks data compressed by the RPC client.
114 @type data: list or tuple
115 @param data: Data sent by RPC client
117 @return: Decompressed data
120 assert isinstance(data, (list, tuple))
121 assert len(data) == 2
122 (encoding, content) = data
123 if encoding == constants.RPC_ENCODING_NONE:
125 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126 return zlib.decompress(base64.b64decode(content))
128 raise AssertionError("Unknown data encoding")
131 def _CleanDirectory(path, exclude=None):
132 """Removes all regular files in a directory.
135 @param path: the directory to clean
137 @param exclude: list of files to be excluded, defaults
141 if not os.path.isdir(path):
146 # Normalize excluded paths
147 exclude = [os.path.normpath(i) for i in exclude]
149 for rel_name in utils.ListVisibleFiles(path):
150 full_name = os.path.normpath(os.path.join(path, rel_name))
151 if full_name in exclude:
153 if os.path.isfile(full_name) and not os.path.islink(full_name):
154 utils.RemoveFile(full_name)
157 def _BuildUploadFileList():
158 """Build the list of allowed upload files.
160 This is abstracted so that it's built only once at module import time.
163 allowed_files = set([
164 constants.CLUSTER_CONF_FILE,
166 constants.SSH_KNOWN_HOSTS_FILE,
167 constants.VNC_PASSWORD_FILE,
168 constants.RAPI_CERT_FILE,
169 constants.RAPI_USERS_FILE,
170 constants.HMAC_CLUSTER_KEY,
173 for hv_name in constants.HYPER_TYPES:
174 hv_class = hypervisor.GetHypervisorClass(hv_name)
175 allowed_files.update(hv_class.GetAncillaryFiles())
177 return frozenset(allowed_files)
180 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
184 """Removes job queue files and archived jobs.
190 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
195 """Returns master information.
197 This is an utility function to compute master information, either
198 for consumption here or from the node daemon.
201 @return: master_netdev, master_ip, master_name
202 @raise RPCFail: in case of errors
207 master_netdev = cfg.GetMasterNetdev()
208 master_ip = cfg.GetMasterIP()
209 master_node = cfg.GetMasterNode()
210 except errors.ConfigurationError, err:
211 _Fail("Cluster configuration incomplete: %s", err, exc=True)
212 return (master_netdev, master_ip, master_node)
215 def StartMaster(start_daemons, no_voting):
216 """Activate local node as master node.
218 The function will always try activate the IP address of the master
219 (unless someone else has it). It will also start the master daemons,
220 based on the start_daemons parameter.
222 @type start_daemons: boolean
223 @param start_daemons: whether to also start the master
224 daemons (ganeti-masterd and ganeti-rapi)
225 @type no_voting: boolean
226 @param no_voting: whether to start ganeti-masterd without a node vote
227 (if start_daemons is True), but still non-interactively
231 # GetMasterInfo will raise an exception if not able to return data
232 master_netdev, master_ip, _ = GetMasterInfo()
235 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236 if utils.OwnIpAddress(master_ip):
237 # we already have the ip:
238 logging.debug("Master IP already configured, doing nothing")
240 msg = "Someone else has the master ip, not activating"
244 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245 "dev", master_netdev, "label",
246 "%s:0" % master_netdev])
248 msg = "Can't activate master IP: %s" % result.output
252 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253 "-s", master_ip, master_ip])
254 # we'll ignore the exit code of arping
256 # and now start the master and rapi daemons
259 'ganeti-masterd': [],
263 daemons_params['ganeti-masterd'].append('--no-voting')
264 daemons_params['ganeti-masterd'].append('--yes-do-it')
265 for daemon in daemons_params:
267 cmd.extend(daemons_params[daemon])
268 result = utils.RunCmd(cmd)
270 msg = "Can't start daemon %s: %s" % (daemon, result.output)
275 _Fail("; ".join(err_msgs))
278 def StopMaster(stop_daemons):
279 """Deactivate this node as master.
281 The function will always try to deactivate the IP address of the
282 master. It will also stop the master daemons depending on the
283 stop_daemons parameter.
285 @type stop_daemons: boolean
286 @param stop_daemons: whether to also stop the master daemons
287 (ganeti-masterd and ganeti-rapi)
291 # TODO: log and report back to the caller the error failures; we
292 # need to decide in which case we fail the RPC for this
294 # GetMasterInfo will raise an exception if not able to return data
295 master_netdev, master_ip, _ = GetMasterInfo()
297 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298 "dev", master_netdev])
300 logging.error("Can't remove the master IP, error: %s", result.output)
301 # but otherwise ignore the failure
304 # stop/kill the rapi and the master daemon
305 for daemon in constants.RAPI, constants.MASTERD:
306 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
309 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310 """Joins this node to the cluster.
312 This does the following:
313 - updates the hostkeys of the machine (rsa and dsa)
314 - adds the ssh private key to the user
315 - adds the ssh public key to the users' authorized_keys file
318 @param dsa: the DSA private key to write
320 @param dsapub: the DSA public key to write
322 @param rsa: the RSA private key to write
324 @param rsapub: the RSA public key to write
326 @param sshkey: the SSH private key to write
328 @param sshpub: the SSH public key to write
330 @return: the success of the operation
333 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337 for name, content, mode in sshd_keys:
338 utils.WriteFile(name, data=content, mode=mode)
341 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
343 except errors.OpExecError, err:
344 _Fail("Error while processing user ssh files: %s", err, exc=True)
346 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347 utils.WriteFile(name, data=content, mode=0600)
349 utils.AddAuthorizedKey(auth_keys, sshpub)
351 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
355 """Cleans up and remove the current node.
357 This function cleans up and prepares the current node to be removed
360 If processing is successful, then it raises an
361 L{errors.QuitGanetiException} which is used as a special case to
362 shutdown the node daemon.
365 _CleanDirectory(constants.DATA_DIR)
369 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
371 utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
373 utils.RemoveFile(priv_key)
374 utils.RemoveFile(pub_key)
375 except errors.OpExecError:
376 logging.exception("Error while processing ssh files")
378 # Raise a custom exception (handled in ganeti-noded)
379 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
382 def GetNodeInfo(vgname, hypervisor_type):
383 """Gives back a hash with different information about the node.
385 @type vgname: C{string}
386 @param vgname: the name of the volume group to ask for disk space information
387 @type hypervisor_type: C{str}
388 @param hypervisor_type: the name of the hypervisor to ask for
391 @return: dictionary with the following keys:
392 - vg_size is the size of the configured volume group in MiB
393 - vg_free is the free size of the volume group in MiB
394 - memory_dom0 is the memory allocated for domain0 in MiB
395 - memory_free is the currently available (free) ram in MiB
396 - memory_total is the total number of ram in MiB
400 vginfo = _GetVGInfo(vgname)
401 outputarray['vg_size'] = vginfo['vg_size']
402 outputarray['vg_free'] = vginfo['vg_free']
404 hyper = hypervisor.GetHypervisor(hypervisor_type)
405 hyp_info = hyper.GetNodeInfo()
406 if hyp_info is not None:
407 outputarray.update(hyp_info)
409 outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
414 def VerifyNode(what, cluster_name):
415 """Verify the status of the local node.
417 Based on the input L{what} parameter, various checks are done on the
420 If the I{filelist} key is present, this list of
421 files is checksummed and the file/checksum pairs are returned.
423 If the I{nodelist} key is present, we check that we have
424 connectivity via ssh with the target nodes (and check the hostname
427 If the I{node-net-test} key is present, we check that we have
428 connectivity to the given nodes via both primary IP and, if
429 applicable, secondary IPs.
432 @param what: a dictionary of things to check:
433 - filelist: list of files for which to compute checksums
434 - nodelist: list of nodes we should check ssh communication with
435 - node-net-test: list of nodes we should check node daemon port
437 - hypervisor: list with hypervisors to run the verify for
439 @return: a dictionary with the same keys as the input dict, and
440 values representing the result of the checks
445 if constants.NV_HYPERVISOR in what:
446 result[constants.NV_HYPERVISOR] = tmp = {}
447 for hv_name in what[constants.NV_HYPERVISOR]:
448 tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
450 if constants.NV_FILELIST in what:
451 result[constants.NV_FILELIST] = utils.FingerprintFiles(
452 what[constants.NV_FILELIST])
454 if constants.NV_NODELIST in what:
455 result[constants.NV_NODELIST] = tmp = {}
456 random.shuffle(what[constants.NV_NODELIST])
457 for node in what[constants.NV_NODELIST]:
458 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
462 if constants.NV_NODENETTEST in what:
463 result[constants.NV_NODENETTEST] = tmp = {}
464 my_name = utils.HostInfo().name
465 my_pip = my_sip = None
466 for name, pip, sip in what[constants.NV_NODENETTEST]:
472 tmp[my_name] = ("Can't find my own primary/secondary IP"
475 port = utils.GetDaemonPort(constants.NODED)
476 for name, pip, sip in what[constants.NV_NODENETTEST]:
478 if not utils.TcpPing(pip, port, source=my_pip):
479 fail.append("primary")
481 if not utils.TcpPing(sip, port, source=my_sip):
482 fail.append("secondary")
484 tmp[name] = ("failure using the %s interface(s)" %
487 if constants.NV_LVLIST in what:
488 result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
490 if constants.NV_INSTANCELIST in what:
491 result[constants.NV_INSTANCELIST] = GetInstanceList(
492 what[constants.NV_INSTANCELIST])
494 if constants.NV_VGLIST in what:
495 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
497 if constants.NV_VERSION in what:
498 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
499 constants.RELEASE_VERSION)
501 if constants.NV_HVINFO in what:
502 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
503 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
505 if constants.NV_DRBDLIST in what:
507 used_minors = bdev.DRBD8.GetUsedDevs().keys()
508 except errors.BlockDeviceError, err:
509 logging.warning("Can't get used minors list", exc_info=True)
510 used_minors = str(err)
511 result[constants.NV_DRBDLIST] = used_minors
516 def GetVolumeList(vg_name):
517 """Compute list of logical volumes and their size.
520 @param vg_name: the volume group whose LVs we should list
523 dictionary of all partions (key) with value being a tuple of
524 their size (in MiB), inactive and online status::
526 {'test1': ('20.06', True, True)}
528 in case of errors, a string is returned with the error
534 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
535 "--separator=%s" % sep,
536 "-olv_name,lv_size,lv_attr", vg_name])
538 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
540 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
541 for line in result.stdout.splitlines():
543 match = valid_line_re.match(line)
545 logging.error("Invalid line returned from lvs output: '%s'", line)
547 name, size, attr = match.groups()
548 inactive = attr[4] == '-'
549 online = attr[5] == 'o'
550 virtual = attr[0] == 'v'
552 # we don't want to report such volumes as existing, since they
553 # don't really hold data
555 lvs[name] = (size, inactive, online)
560 def ListVolumeGroups():
561 """List the volume groups and their size.
564 @return: dictionary with keys volume name and values the
568 return utils.ListVolumeGroups()
572 """List all volumes on this node.
576 A list of dictionaries, each having four keys:
577 - name: the logical volume name,
578 - size: the size of the logical volume
579 - dev: the physical device on which the LV lives
580 - vg: the volume group to which it belongs
582 In case of errors, we return an empty list and log the
585 Note that since a logical volume can live on multiple physical
586 volumes, the resulting list might include a logical volume
590 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
592 "--options=lv_name,lv_size,devices,vg_name"])
594 _Fail("Failed to list logical volumes, lvs output: %s",
599 return dev.split('(')[0]
605 'name': line[0].strip(),
606 'size': line[1].strip(),
607 'dev': parse_dev(line[2].strip()),
608 'vg': line[3].strip(),
611 return [map_line(line.split('|')) for line in result.stdout.splitlines()
612 if line.count('|') >= 3]
615 def BridgesExist(bridges_list):
616 """Check if a list of bridges exist on the current node.
619 @return: C{True} if all of them exist, C{False} otherwise
623 for bridge in bridges_list:
624 if not utils.BridgeExists(bridge):
625 missing.append(bridge)
628 _Fail("Missing bridges %s", ", ".join(missing))
631 def GetInstanceList(hypervisor_list):
632 """Provides a list of instances.
634 @type hypervisor_list: list
635 @param hypervisor_list: the list of hypervisors to query information
638 @return: a list of all running instances on the current node
639 - instance1.example.com
640 - instance2.example.com
644 for hname in hypervisor_list:
646 names = hypervisor.GetHypervisor(hname).ListInstances()
647 results.extend(names)
648 except errors.HypervisorError, err:
649 _Fail("Error enumerating instances (hypervisor %s): %s",
650 hname, err, exc=True)
655 def GetInstanceInfo(instance, hname):
656 """Gives back the information about an instance as a dictionary.
658 @type instance: string
659 @param instance: the instance name
661 @param hname: the hypervisor type of the instance
664 @return: dictionary with the following keys:
665 - memory: memory size of instance (int)
666 - state: xen state of instance (string)
667 - time: cpu time of instance (float)
672 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
673 if iinfo is not None:
674 output['memory'] = iinfo[2]
675 output['state'] = iinfo[4]
676 output['time'] = iinfo[5]
681 def GetInstanceMigratable(instance):
682 """Gives whether an instance can be migrated.
684 @type instance: L{objects.Instance}
685 @param instance: object representing the instance to be checked.
688 @return: tuple of (result, description) where:
689 - result: whether the instance can be migrated or not
690 - description: a description of the issue, if relevant
693 hyper = hypervisor.GetHypervisor(instance.hypervisor)
694 iname = instance.name
695 if iname not in hyper.ListInstances():
696 _Fail("Instance %s is not running", iname)
698 for idx in range(len(instance.disks)):
699 link_name = _GetBlockDevSymlinkPath(iname, idx)
700 if not os.path.islink(link_name):
701 _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
704 def GetAllInstancesInfo(hypervisor_list):
705 """Gather data about all instances.
707 This is the equivalent of L{GetInstanceInfo}, except that it
708 computes data for all instances at once, thus being faster if one
709 needs data about more than one instance.
711 @type hypervisor_list: list
712 @param hypervisor_list: list of hypervisors to query for instance data
715 @return: dictionary of instance: data, with data having the following keys:
716 - memory: memory size of instance (int)
717 - state: xen state of instance (string)
718 - time: cpu time of instance (float)
719 - vcpus: the number of vcpus
724 for hname in hypervisor_list:
725 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
727 for name, _, memory, vcpus, state, times in iinfo:
735 # we only check static parameters, like memory and vcpus,
736 # and not state and time which can change between the
737 # invocations of the different hypervisors
738 for key in 'memory', 'vcpus':
739 if value[key] != output[name][key]:
740 _Fail("Instance %s is running twice"
741 " with different parameters", name)
747 def InstanceOsAdd(instance, reinstall):
748 """Add an OS to an instance.
750 @type instance: L{objects.Instance}
751 @param instance: Instance whose OS is to be installed
752 @type reinstall: boolean
753 @param reinstall: whether this is an instance reinstall
757 inst_os = OSFromDisk(instance.os)
759 create_env = OSEnvironment(instance, inst_os)
761 create_env['INSTANCE_REINSTALL'] = "1"
763 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
764 instance.name, int(time.time()))
766 result = utils.RunCmd([inst_os.create_script], env=create_env,
767 cwd=inst_os.path, output=logfile,)
769 logging.error("os create command '%s' returned error: %s, logfile: %s,"
770 " output: %s", result.cmd, result.fail_reason, logfile,
772 lines = [utils.SafeEncode(val)
773 for val in utils.TailFile(logfile, lines=20)]
774 _Fail("OS create script failed (%s), last lines in the"
775 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
778 def RunRenameInstance(instance, old_name):
779 """Run the OS rename script for an instance.
781 @type instance: L{objects.Instance}
782 @param instance: Instance whose OS is to be installed
783 @type old_name: string
784 @param old_name: previous instance name
786 @return: the success of the operation
789 inst_os = OSFromDisk(instance.os)
791 rename_env = OSEnvironment(instance, inst_os)
792 rename_env['OLD_INSTANCE_NAME'] = old_name
794 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
796 instance.name, int(time.time()))
798 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
799 cwd=inst_os.path, output=logfile)
802 logging.error("os create command '%s' returned error: %s output: %s",
803 result.cmd, result.fail_reason, result.output)
804 lines = [utils.SafeEncode(val)
805 for val in utils.TailFile(logfile, lines=20)]
806 _Fail("OS rename script failed (%s), last lines in the"
807 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
810 def _GetVGInfo(vg_name):
811 """Get information about the volume group.
814 @param vg_name: the volume group which we query
817 A dictionary with the following keys:
818 - C{vg_size} is the total size of the volume group in MiB
819 - C{vg_free} is the free size of the volume group in MiB
820 - C{pv_count} are the number of physical disks in that VG
822 If an error occurs during gathering of data, we return the same dict
823 with keys all set to None.
826 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
828 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
829 "--nosuffix", "--units=m", "--separator=:", vg_name])
832 logging.error("volume group %s not present", vg_name)
834 valarr = retval.stdout.strip().rstrip(':').split(':')
838 "vg_size": int(round(float(valarr[0]), 0)),
839 "vg_free": int(round(float(valarr[1]), 0)),
840 "pv_count": int(valarr[2]),
842 except ValueError, err:
843 logging.exception("Fail to parse vgs output: %s", err)
845 logging.error("vgs output has the wrong number of fields (expected"
846 " three): %s", str(valarr))
850 def _GetBlockDevSymlinkPath(instance_name, idx):
851 return os.path.join(constants.DISK_LINKS_DIR,
852 "%s:%d" % (instance_name, idx))
855 def _SymlinkBlockDev(instance_name, device_path, idx):
856 """Set up symlinks to a instance's block device.
858 This is an auxiliary function run when an instance is start (on the primary
859 node) or when an instance is migrated (on the target node).
862 @param instance_name: the name of the target instance
863 @param device_path: path of the physical block device, on the node
864 @param idx: the disk index
865 @return: absolute path to the disk's symlink
868 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
870 os.symlink(device_path, link_name)
872 if err.errno == errno.EEXIST:
873 if (not os.path.islink(link_name) or
874 os.readlink(link_name) != device_path):
876 os.symlink(device_path, link_name)
883 def _RemoveBlockDevLinks(instance_name, disks):
884 """Remove the block device symlinks belonging to the given instance.
887 for idx, _ in enumerate(disks):
888 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
889 if os.path.islink(link_name):
893 logging.exception("Can't remove symlink '%s'", link_name)
896 def _GatherAndLinkBlockDevs(instance):
897 """Set up an instance's block device(s).
899 This is run on the primary node at instance startup. The block
900 devices must be already assembled.
902 @type instance: L{objects.Instance}
903 @param instance: the instance whose disks we shoul assemble
905 @return: list of (disk_object, device_path)
909 for idx, disk in enumerate(instance.disks):
910 device = _RecursiveFindBD(disk)
912 raise errors.BlockDeviceError("Block device '%s' is not set up." %
916 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
918 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
921 block_devices.append((disk, link_name))
926 def StartInstance(instance):
927 """Start an instance.
929 @type instance: L{objects.Instance}
930 @param instance: the instance object
934 running_instances = GetInstanceList([instance.hypervisor])
936 if instance.name in running_instances:
937 logging.info("Instance %s already running, not starting", instance.name)
941 block_devices = _GatherAndLinkBlockDevs(instance)
942 hyper = hypervisor.GetHypervisor(instance.hypervisor)
943 hyper.StartInstance(instance, block_devices)
944 except errors.BlockDeviceError, err:
945 _Fail("Block device error: %s", err, exc=True)
946 except errors.HypervisorError, err:
947 _RemoveBlockDevLinks(instance.name, instance.disks)
948 _Fail("Hypervisor error: %s", err, exc=True)
951 def InstanceShutdown(instance):
952 """Shut an instance down.
954 @note: this functions uses polling with a hardcoded timeout.
956 @type instance: L{objects.Instance}
957 @param instance: the instance object
961 hv_name = instance.hypervisor
962 running_instances = GetInstanceList([hv_name])
963 iname = instance.name
965 if iname not in running_instances:
966 logging.info("Instance %s not running, doing nothing", iname)
969 hyper = hypervisor.GetHypervisor(hv_name)
971 hyper.StopInstance(instance)
972 except errors.HypervisorError, err:
973 _Fail("Failed to stop instance %s: %s", iname, err)
975 # test every 10secs for 2min
979 if instance.name not in GetInstanceList([hv_name]):
983 # the shutdown did not succeed
984 logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
987 hyper.StopInstance(instance, force=True)
988 except errors.HypervisorError, err:
989 _Fail("Failed to force stop instance %s: %s", iname, err)
992 if instance.name in GetInstanceList([hv_name]):
993 _Fail("Could not shutdown instance %s even by destroy", iname)
995 _RemoveBlockDevLinks(iname, instance.disks)
998 def InstanceReboot(instance, reboot_type):
999 """Reboot an instance.
1001 @type instance: L{objects.Instance}
1002 @param instance: the instance object to reboot
1003 @type reboot_type: str
1004 @param reboot_type: the type of reboot, one the following
1006 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1007 instance OS, do not recreate the VM
1008 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1009 restart the VM (at the hypervisor level)
1010 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1011 not accepted here, since that mode is handled differently, in
1012 cmdlib, and translates into full stop and start of the
1013 instance (instead of a call_instance_reboot RPC)
1017 running_instances = GetInstanceList([instance.hypervisor])
1019 if instance.name not in running_instances:
1020 _Fail("Cannot reboot instance %s that is not running", instance.name)
1022 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1023 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1025 hyper.RebootInstance(instance)
1026 except errors.HypervisorError, err:
1027 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1028 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1030 InstanceShutdown(instance)
1031 return StartInstance(instance)
1032 except errors.HypervisorError, err:
1033 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1035 _Fail("Invalid reboot_type received: %s", reboot_type)
1038 def MigrationInfo(instance):
1039 """Gather information about an instance to be migrated.
1041 @type instance: L{objects.Instance}
1042 @param instance: the instance definition
1045 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1047 info = hyper.MigrationInfo(instance)
1048 except errors.HypervisorError, err:
1049 _Fail("Failed to fetch migration information: %s", err, exc=True)
1053 def AcceptInstance(instance, info, target):
1054 """Prepare the node to accept an instance.
1056 @type instance: L{objects.Instance}
1057 @param instance: the instance definition
1058 @type info: string/data (opaque)
1059 @param info: migration information, from the source node
1060 @type target: string
1061 @param target: target host (usually ip), on this node
1064 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1066 hyper.AcceptInstance(instance, info, target)
1067 except errors.HypervisorError, err:
1068 _Fail("Failed to accept instance: %s", err, exc=True)
1071 def FinalizeMigration(instance, info, success):
1072 """Finalize any preparation to accept an instance.
1074 @type instance: L{objects.Instance}
1075 @param instance: the instance definition
1076 @type info: string/data (opaque)
1077 @param info: migration information, from the source node
1078 @type success: boolean
1079 @param success: whether the migration was a success or a failure
1082 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1084 hyper.FinalizeMigration(instance, info, success)
1085 except errors.HypervisorError, err:
1086 _Fail("Failed to finalize migration: %s", err, exc=True)
1089 def MigrateInstance(instance, target, live):
1090 """Migrates an instance to another node.
1092 @type instance: L{objects.Instance}
1093 @param instance: the instance definition
1094 @type target: string
1095 @param target: the target node name
1097 @param live: whether the migration should be done live or not (the
1098 interpretation of this parameter is left to the hypervisor)
1100 @return: a tuple of (success, msg) where:
1101 - succes is a boolean denoting the success/failure of the operation
1102 - msg is a string with details in case of failure
1105 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1108 hyper.MigrateInstance(instance.name, target, live)
1109 except errors.HypervisorError, err:
1110 _Fail("Failed to migrate instance: %s", err, exc=True)
1113 def BlockdevCreate(disk, size, owner, on_primary, info):
1114 """Creates a block device for an instance.
1116 @type disk: L{objects.Disk}
1117 @param disk: the object describing the disk we should create
1119 @param size: the size of the physical underlying device, in MiB
1121 @param owner: the name of the instance for which disk is created,
1122 used for device cache data
1123 @type on_primary: boolean
1124 @param on_primary: indicates if it is the primary node or not
1126 @param info: string that will be sent to the physical device
1127 creation, used for example to set (LVM) tags on LVs
1129 @return: the new unique_id of the device (this can sometime be
1130 computed only after creation), or None. On secondary nodes,
1131 it's not required to return anything.
1136 for child in disk.children:
1138 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1139 except errors.BlockDeviceError, err:
1140 _Fail("Can't assemble device %s: %s", child, err)
1141 if on_primary or disk.AssembleOnSecondary():
1142 # we need the children open in case the device itself has to
1146 except errors.BlockDeviceError, err:
1147 _Fail("Can't make child '%s' read-write: %s", child, err)
1151 device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1152 except errors.BlockDeviceError, err:
1153 _Fail("Can't create block device: %s", err)
1155 if on_primary or disk.AssembleOnSecondary():
1158 except errors.BlockDeviceError, err:
1159 _Fail("Can't assemble device after creation, unusual event: %s", err)
1160 device.SetSyncSpeed(constants.SYNC_SPEED)
1161 if on_primary or disk.OpenOnSecondary():
1163 device.Open(force=True)
1164 except errors.BlockDeviceError, err:
1165 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1166 DevCacheManager.UpdateCache(device.dev_path, owner,
1167 on_primary, disk.iv_name)
1169 device.SetInfo(info)
1171 return device.unique_id
1174 def BlockdevRemove(disk):
1175 """Remove a block device.
1177 @note: This is intended to be called recursively.
1179 @type disk: L{objects.Disk}
1180 @param disk: the disk object we should remove
1182 @return: the success of the operation
1187 rdev = _RecursiveFindBD(disk)
1188 except errors.BlockDeviceError, err:
1189 # probably can't attach
1190 logging.info("Can't attach to device %s in remove", disk)
1192 if rdev is not None:
1193 r_path = rdev.dev_path
1196 except errors.BlockDeviceError, err:
1197 msgs.append(str(err))
1199 DevCacheManager.RemoveCache(r_path)
1202 for child in disk.children:
1204 BlockdevRemove(child)
1205 except RPCFail, err:
1206 msgs.append(str(err))
1209 _Fail("; ".join(msgs))
1212 def _RecursiveAssembleBD(disk, owner, as_primary):
1213 """Activate a block device for an instance.
1215 This is run on the primary and secondary nodes for an instance.
1217 @note: this function is called recursively.
1219 @type disk: L{objects.Disk}
1220 @param disk: the disk we try to assemble
1222 @param owner: the name of the instance which owns the disk
1223 @type as_primary: boolean
1224 @param as_primary: if we should make the block device
1227 @return: the assembled device or None (in case no device
1229 @raise errors.BlockDeviceError: in case there is an error
1230 during the activation of the children or the device
1236 mcn = disk.ChildrenNeeded()
1238 mcn = 0 # max number of Nones allowed
1240 mcn = len(disk.children) - mcn # max number of Nones
1241 for chld_disk in disk.children:
1243 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1244 except errors.BlockDeviceError, err:
1245 if children.count(None) >= mcn:
1248 logging.error("Error in child activation (but continuing): %s",
1250 children.append(cdev)
1252 if as_primary or disk.AssembleOnSecondary():
1253 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1254 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1256 if as_primary or disk.OpenOnSecondary():
1258 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1259 as_primary, disk.iv_name)
1266 def BlockdevAssemble(disk, owner, as_primary):
1267 """Activate a block device for an instance.
1269 This is a wrapper over _RecursiveAssembleBD.
1271 @rtype: str or boolean
1272 @return: a C{/dev/...} path for primary nodes, and
1273 C{True} for secondary nodes
1277 result = _RecursiveAssembleBD(disk, owner, as_primary)
1278 if isinstance(result, bdev.BlockDev):
1279 result = result.dev_path
1280 except errors.BlockDeviceError, err:
1281 _Fail("Error while assembling disk: %s", err, exc=True)
1286 def BlockdevShutdown(disk):
1287 """Shut down a block device.
1289 First, if the device is assembled (Attach() is successful), then
1290 the device is shutdown. Then the children of the device are
1293 This function is called recursively. Note that we don't cache the
1294 children or such, as oppossed to assemble, shutdown of different
1295 devices doesn't require that the upper device was active.
1297 @type disk: L{objects.Disk}
1298 @param disk: the description of the disk we should
1304 r_dev = _RecursiveFindBD(disk)
1305 if r_dev is not None:
1306 r_path = r_dev.dev_path
1309 DevCacheManager.RemoveCache(r_path)
1310 except errors.BlockDeviceError, err:
1311 msgs.append(str(err))
1314 for child in disk.children:
1316 BlockdevShutdown(child)
1317 except RPCFail, err:
1318 msgs.append(str(err))
1321 _Fail("; ".join(msgs))
1324 def BlockdevAddchildren(parent_cdev, new_cdevs):
1325 """Extend a mirrored block device.
1327 @type parent_cdev: L{objects.Disk}
1328 @param parent_cdev: the disk to which we should add children
1329 @type new_cdevs: list of L{objects.Disk}
1330 @param new_cdevs: the list of children which we should add
1334 parent_bdev = _RecursiveFindBD(parent_cdev)
1335 if parent_bdev is None:
1336 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1337 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1338 if new_bdevs.count(None) > 0:
1339 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1340 parent_bdev.AddChildren(new_bdevs)
1343 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1344 """Shrink a mirrored block device.
1346 @type parent_cdev: L{objects.Disk}
1347 @param parent_cdev: the disk from which we should remove children
1348 @type new_cdevs: list of L{objects.Disk}
1349 @param new_cdevs: the list of children which we should remove
1353 parent_bdev = _RecursiveFindBD(parent_cdev)
1354 if parent_bdev is None:
1355 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1357 for disk in new_cdevs:
1358 rpath = disk.StaticDevPath()
1360 bd = _RecursiveFindBD(disk)
1362 _Fail("Can't find device %s while removing children", disk)
1364 devs.append(bd.dev_path)
1367 parent_bdev.RemoveChildren(devs)
1370 def BlockdevGetmirrorstatus(disks):
1371 """Get the mirroring status of a list of devices.
1373 @type disks: list of L{objects.Disk}
1374 @param disks: the list of disks which we should query
1377 a list of (mirror_done, estimated_time) tuples, which
1378 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1379 @raise errors.BlockDeviceError: if any of the disks cannot be
1385 rbd = _RecursiveFindBD(dsk)
1387 _Fail("Can't find device %s", dsk)
1389 stats.append(rbd.CombinedSyncStatus())
1394 def _RecursiveFindBD(disk):
1395 """Check if a device is activated.
1397 If so, return information about the real device.
1399 @type disk: L{objects.Disk}
1400 @param disk: the disk object we need to find
1402 @return: None if the device can't be found,
1403 otherwise the device instance
1408 for chdisk in disk.children:
1409 children.append(_RecursiveFindBD(chdisk))
1411 return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1414 def BlockdevFind(disk):
1415 """Check if a device is activated.
1417 If it is, return information about the real device.
1419 @type disk: L{objects.Disk}
1420 @param disk: the disk to find
1421 @rtype: None or objects.BlockDevStatus
1422 @return: None if the disk cannot be found, otherwise a the current
1427 rbd = _RecursiveFindBD(disk)
1428 except errors.BlockDeviceError, err:
1429 _Fail("Failed to find device: %s", err, exc=True)
1434 return rbd.GetSyncStatus()
1437 def BlockdevGetsize(disks):
1438 """Computes the size of the given disks.
1440 If a disk is not found, returns None instead.
1442 @type disks: list of L{objects.Disk}
1443 @param disks: the list of disk to compute the size for
1445 @return: list with elements None if the disk cannot be found,
1452 rbd = _RecursiveFindBD(cf)
1453 except errors.BlockDeviceError, err:
1459 result.append(rbd.GetActualSize())
1463 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1464 """Export a block device to a remote node.
1466 @type disk: L{objects.Disk}
1467 @param disk: the description of the disk to export
1468 @type dest_node: str
1469 @param dest_node: the destination node to export to
1470 @type dest_path: str
1471 @param dest_path: the destination path on the target node
1472 @type cluster_name: str
1473 @param cluster_name: the cluster name, needed for SSH hostalias
1477 real_disk = _RecursiveFindBD(disk)
1478 if real_disk is None:
1479 _Fail("Block device '%s' is not set up", disk)
1483 # the block size on the read dd is 1MiB to match our units
1484 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1485 "dd if=%s bs=1048576 count=%s",
1486 real_disk.dev_path, str(disk.size))
1488 # we set here a smaller block size as, due to ssh buffering, more
1489 # than 64-128k will mostly ignored; we use nocreat to fail if the
1490 # device is not already there or we pass a wrong path; we use
1491 # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1492 # to not buffer too much memory; this means that at best, we flush
1493 # every 64k, which will not be very fast
1494 destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1495 " oflag=dsync", dest_path)
1497 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1498 constants.GANETI_RUNAS,
1501 # all commands have been checked, so we're safe to combine them
1502 command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1504 result = utils.RunCmd(["bash", "-c", command])
1507 _Fail("Disk copy command '%s' returned error: %s"
1508 " output: %s", command, result.fail_reason, result.output)
1511 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1512 """Write a file to the filesystem.
1514 This allows the master to overwrite(!) a file. It will only perform
1515 the operation if the file belongs to a list of configuration files.
1517 @type file_name: str
1518 @param file_name: the target file name
1520 @param data: the new contents of the file
1522 @param mode: the mode to give the file (can be None)
1524 @param uid: the owner of the file (can be -1 for default)
1526 @param gid: the group of the file (can be -1 for default)
1528 @param atime: the atime to set on the file (can be None)
1530 @param mtime: the mtime to set on the file (can be None)
1534 if not os.path.isabs(file_name):
1535 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1537 if file_name not in _ALLOWED_UPLOAD_FILES:
1538 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1541 raw_data = _Decompress(data)
1543 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1544 atime=atime, mtime=mtime)
1547 def WriteSsconfFiles(values):
1548 """Update all ssconf files.
1550 Wrapper around the SimpleStore.WriteFiles.
1553 ssconf.SimpleStore().WriteFiles(values)
1556 def _ErrnoOrStr(err):
1557 """Format an EnvironmentError exception.
1559 If the L{err} argument has an errno attribute, it will be looked up
1560 and converted into a textual C{E...} description. Otherwise the
1561 string representation of the error will be returned.
1563 @type err: L{EnvironmentError}
1564 @param err: the exception to format
1567 if hasattr(err, 'errno'):
1568 detail = errno.errorcode[err.errno]
1574 def _OSOndiskAPIVersion(name, os_dir):
1575 """Compute and return the API version of a given OS.
1577 This function will try to read the API version of the OS given by
1578 the 'name' parameter and residing in the 'os_dir' directory.
1581 @param name: the OS name we should look for
1583 @param os_dir: the directory inwhich we should look for the OS
1585 @return: tuple (status, data) with status denoting the validity and
1586 data holding either the vaid versions or an error message
1589 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1592 st = os.stat(api_file)
1593 except EnvironmentError, err:
1594 return False, ("Required file 'ganeti_api_version' file not"
1595 " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1597 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1598 return False, ("File 'ganeti_api_version' file at %s is not"
1599 " a regular file" % os_dir)
1602 api_versions = utils.ReadFile(api_file).splitlines()
1603 except EnvironmentError, err:
1604 return False, ("Error while reading the API version file at %s: %s" %
1605 (api_file, _ErrnoOrStr(err)))
1608 api_versions = [int(version.strip()) for version in api_versions]
1609 except (TypeError, ValueError), err:
1610 return False, ("API version(s) can't be converted to integer: %s" %
1613 return True, api_versions
1616 def DiagnoseOS(top_dirs=None):
1617 """Compute the validity for all OSes.
1619 @type top_dirs: list
1620 @param top_dirs: the list of directories in which to
1621 search (if not given defaults to
1622 L{constants.OS_SEARCH_PATH})
1623 @rtype: list of L{objects.OS}
1624 @return: a list of tuples (name, path, status, diagnose)
1625 for all (potential) OSes under all search paths, where:
1626 - name is the (potential) OS name
1627 - path is the full path to the OS
1628 - status True/False is the validity of the OS
1629 - diagnose is the error message for an invalid OS, otherwise empty
1632 if top_dirs is None:
1633 top_dirs = constants.OS_SEARCH_PATH
1636 for dir_name in top_dirs:
1637 if os.path.isdir(dir_name):
1639 f_names = utils.ListVisibleFiles(dir_name)
1640 except EnvironmentError, err:
1641 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1643 for name in f_names:
1644 os_path = os.path.sep.join([dir_name, name])
1645 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1650 result.append((name, os_path, status, diagnose))
1655 def _TryOSFromDisk(name, base_dir=None):
1656 """Create an OS instance from disk.
1658 This function will return an OS instance if the given name is a
1661 @type base_dir: string
1662 @keyword base_dir: Base directory containing OS installations.
1663 Defaults to a search in all the OS_SEARCH_PATH dirs.
1665 @return: success and either the OS instance if we find a valid one,
1669 if base_dir is None:
1670 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1672 return False, "Directory for OS %s not found in search path" % name
1674 os_dir = os.path.sep.join([base_dir, name])
1676 status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1679 return status, api_versions
1681 if not constants.OS_API_VERSIONS.intersection(api_versions):
1682 return False, ("API version mismatch for path '%s': found %s, want %s." %
1683 (os_dir, api_versions, constants.OS_API_VERSIONS))
1685 # OS Scripts dictionary, we will populate it with the actual script names
1686 os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1688 for script in os_scripts:
1689 os_scripts[script] = os.path.sep.join([os_dir, script])
1692 st = os.stat(os_scripts[script])
1693 except EnvironmentError, err:
1694 return False, ("Script '%s' under path '%s' is missing (%s)" %
1695 (script, os_dir, _ErrnoOrStr(err)))
1697 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1698 return False, ("Script '%s' under path '%s' is not executable" %
1701 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1702 return False, ("Script '%s' under path '%s' is not a regular file" %
1705 os_obj = objects.OS(name=name, path=os_dir,
1706 create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1707 export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1708 import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1709 rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1710 api_versions=api_versions)
1714 def OSFromDisk(name, base_dir=None):
1715 """Create an OS instance from disk.
1717 This function will return an OS instance if the given name is a
1718 valid OS name. Otherwise, it will raise an appropriate
1719 L{RPCFail} exception, detailing why this is not a valid OS.
1721 This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1722 an exception but returns true/false status data.
1724 @type base_dir: string
1725 @keyword base_dir: Base directory containing OS installations.
1726 Defaults to a search in all the OS_SEARCH_PATH dirs.
1727 @rtype: L{objects.OS}
1728 @return: the OS instance if we find a valid one
1729 @raise RPCFail: if we don't find a valid OS
1732 status, payload = _TryOSFromDisk(name, base_dir)
1740 def OSEnvironment(instance, os, debug=0):
1741 """Calculate the environment for an os script.
1743 @type instance: L{objects.Instance}
1744 @param instance: target instance for the os script run
1745 @type os: L{objects.OS}
1746 @param os: operating system for which the environment is being built
1747 @type debug: integer
1748 @param debug: debug level (0 or 1, for OS Api 10)
1750 @return: dict of environment variables
1751 @raise errors.BlockDeviceError: if the block device
1756 api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1757 result['OS_API_VERSION'] = '%d' % api_version
1758 result['INSTANCE_NAME'] = instance.name
1759 result['INSTANCE_OS'] = instance.os
1760 result['HYPERVISOR'] = instance.hypervisor
1761 result['DISK_COUNT'] = '%d' % len(instance.disks)
1762 result['NIC_COUNT'] = '%d' % len(instance.nics)
1763 result['DEBUG_LEVEL'] = '%d' % debug
1764 for idx, disk in enumerate(instance.disks):
1765 real_disk = _RecursiveFindBD(disk)
1766 if real_disk is None:
1767 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1770 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1771 result['DISK_%d_ACCESS' % idx] = disk.mode
1772 if constants.HV_DISK_TYPE in instance.hvparams:
1773 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1774 instance.hvparams[constants.HV_DISK_TYPE]
1775 if disk.dev_type in constants.LDS_BLOCK:
1776 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1777 elif disk.dev_type == constants.LD_FILE:
1778 result['DISK_%d_BACKEND_TYPE' % idx] = \
1779 'file:%s' % disk.physical_id[0]
1780 for idx, nic in enumerate(instance.nics):
1781 result['NIC_%d_MAC' % idx] = nic.mac
1783 result['NIC_%d_IP' % idx] = nic.ip
1784 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1785 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1786 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1787 if nic.nicparams[constants.NIC_LINK]:
1788 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1789 if constants.HV_NIC_TYPE in instance.hvparams:
1790 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1791 instance.hvparams[constants.HV_NIC_TYPE]
1793 for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1794 for key, value in source.items():
1795 result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1799 def BlockdevGrow(disk, amount):
1800 """Grow a stack of block devices.
1802 This function is called recursively, with the childrens being the
1803 first ones to resize.
1805 @type disk: L{objects.Disk}
1806 @param disk: the disk to be grown
1807 @rtype: (status, result)
1808 @return: a tuple with the status of the operation
1809 (True/False), and the errors message if status
1813 r_dev = _RecursiveFindBD(disk)
1815 _Fail("Cannot find block device %s", disk)
1819 except errors.BlockDeviceError, err:
1820 _Fail("Failed to grow block device: %s", err, exc=True)
1823 def BlockdevSnapshot(disk):
1824 """Create a snapshot copy of a block device.
1826 This function is called recursively, and the snapshot is actually created
1827 just for the leaf lvm backend device.
1829 @type disk: L{objects.Disk}
1830 @param disk: the disk to be snapshotted
1832 @return: snapshot disk path
1836 if len(disk.children) == 1:
1837 # only one child, let's recurse on it
1838 return BlockdevSnapshot(disk.children[0])
1840 # more than one child, choose one that matches
1841 for child in disk.children:
1842 if child.size == disk.size:
1843 # return implies breaking the loop
1844 return BlockdevSnapshot(child)
1845 elif disk.dev_type == constants.LD_LV:
1846 r_dev = _RecursiveFindBD(disk)
1847 if r_dev is not None:
1848 # let's stay on the safe side and ask for the full size, for now
1849 return r_dev.Snapshot(disk.size)
1851 _Fail("Cannot find block device %s", disk)
1853 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1854 disk.unique_id, disk.dev_type)
1857 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1858 """Export a block device snapshot to a remote node.
1860 @type disk: L{objects.Disk}
1861 @param disk: the description of the disk to export
1862 @type dest_node: str
1863 @param dest_node: the destination node to export to
1864 @type instance: L{objects.Instance}
1865 @param instance: the instance object to whom the disk belongs
1866 @type cluster_name: str
1867 @param cluster_name: the cluster name, needed for SSH hostalias
1869 @param idx: the index of the disk in the instance's disk list,
1870 used to export to the OS scripts environment
1874 inst_os = OSFromDisk(instance.os)
1875 export_env = OSEnvironment(instance, inst_os)
1877 export_script = inst_os.export_script
1879 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1880 instance.name, int(time.time()))
1881 if not os.path.exists(constants.LOG_OS_DIR):
1882 os.mkdir(constants.LOG_OS_DIR, 0750)
1883 real_disk = _RecursiveFindBD(disk)
1884 if real_disk is None:
1885 _Fail("Block device '%s' is not set up", disk)
1889 export_env['EXPORT_DEVICE'] = real_disk.dev_path
1890 export_env['EXPORT_INDEX'] = str(idx)
1892 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1893 destfile = disk.physical_id[1]
1895 # the target command is built out of three individual commands,
1896 # which are joined by pipes; we check each individual command for
1898 expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1899 inst_os.path, export_script, logfile)
1903 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1904 destdir, destdir, destfile)
1905 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1906 constants.GANETI_RUNAS,
1909 # all commands have been checked, so we're safe to combine them
1910 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1912 result = utils.RunCmd(["bash", "-c", command], env=export_env)
1915 _Fail("OS snapshot export command '%s' returned error: %s"
1916 " output: %s", command, result.fail_reason, result.output)
1919 def FinalizeExport(instance, snap_disks):
1920 """Write out the export configuration information.
1922 @type instance: L{objects.Instance}
1923 @param instance: the instance which we export, used for
1924 saving configuration
1925 @type snap_disks: list of L{objects.Disk}
1926 @param snap_disks: list of snapshot block devices, which
1927 will be used to get the actual name of the dump file
1932 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1933 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1935 config = objects.SerializableConfigParser()
1937 config.add_section(constants.INISECT_EXP)
1938 config.set(constants.INISECT_EXP, 'version', '0')
1939 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1940 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1941 config.set(constants.INISECT_EXP, 'os', instance.os)
1942 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1944 config.add_section(constants.INISECT_INS)
1945 config.set(constants.INISECT_INS, 'name', instance.name)
1946 config.set(constants.INISECT_INS, 'memory', '%d' %
1947 instance.beparams[constants.BE_MEMORY])
1948 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1949 instance.beparams[constants.BE_VCPUS])
1950 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1953 for nic_count, nic in enumerate(instance.nics):
1955 config.set(constants.INISECT_INS, 'nic%d_mac' %
1956 nic_count, '%s' % nic.mac)
1957 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1958 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1960 # TODO: redundant: on load can read nics until it doesn't exist
1961 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1964 for disk_count, disk in enumerate(snap_disks):
1967 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1968 ('%s' % disk.iv_name))
1969 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1970 ('%s' % disk.physical_id[1]))
1971 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1974 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1976 utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1977 data=config.Dumps())
1978 shutil.rmtree(finaldestdir, True)
1979 shutil.move(destdir, finaldestdir)
1982 def ExportInfo(dest):
1983 """Get export configuration information.
1986 @param dest: directory containing the export
1988 @rtype: L{objects.SerializableConfigParser}
1989 @return: a serializable config file containing the
1993 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1995 config = objects.SerializableConfigParser()
1998 if (not config.has_section(constants.INISECT_EXP) or
1999 not config.has_section(constants.INISECT_INS)):
2000 _Fail("Export info file doesn't have the required fields")
2002 return config.Dumps()
2005 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2006 """Import an os image into an instance.
2008 @type instance: L{objects.Instance}
2009 @param instance: instance to import the disks into
2010 @type src_node: string
2011 @param src_node: source node for the disk images
2012 @type src_images: list of string
2013 @param src_images: absolute paths of the disk images
2014 @rtype: list of boolean
2015 @return: each boolean represent the success of importing the n-th disk
2018 inst_os = OSFromDisk(instance.os)
2019 import_env = OSEnvironment(instance, inst_os)
2020 import_script = inst_os.import_script
2022 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2023 instance.name, int(time.time()))
2024 if not os.path.exists(constants.LOG_OS_DIR):
2025 os.mkdir(constants.LOG_OS_DIR, 0750)
2028 impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2029 import_script, logfile)
2032 for idx, image in enumerate(src_images):
2034 destcmd = utils.BuildShellCmd('cat %s', image)
2035 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2036 constants.GANETI_RUNAS,
2038 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2039 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2040 import_env['IMPORT_INDEX'] = str(idx)
2041 result = utils.RunCmd(command, env=import_env)
2043 logging.error("Disk import command '%s' returned error: %s"
2044 " output: %s", command, result.fail_reason,
2046 final_result.append("error importing disk %d: %s, %s" %
2047 (idx, result.fail_reason, result.output[-100]))
2050 _Fail("; ".join(final_result), log=False)
2054 """Return a list of exports currently available on this machine.
2057 @return: list of the exports
2060 if os.path.isdir(constants.EXPORT_DIR):
2061 return utils.ListVisibleFiles(constants.EXPORT_DIR)
2063 _Fail("No exports directory")
2066 def RemoveExport(export):
2067 """Remove an existing export from the node.
2070 @param export: the name of the export to remove
2074 target = os.path.join(constants.EXPORT_DIR, export)
2077 shutil.rmtree(target)
2078 except EnvironmentError, err:
2079 _Fail("Error while removing the export: %s", err, exc=True)
2082 def BlockdevRename(devlist):
2083 """Rename a list of block devices.
2085 @type devlist: list of tuples
2086 @param devlist: list of tuples of the form (disk,
2087 new_logical_id, new_physical_id); disk is an
2088 L{objects.Disk} object describing the current disk,
2089 and new logical_id/physical_id is the name we
2092 @return: True if all renames succeeded, False otherwise
2097 for disk, unique_id in devlist:
2098 dev = _RecursiveFindBD(disk)
2100 msgs.append("Can't find device %s in rename" % str(disk))
2104 old_rpath = dev.dev_path
2105 dev.Rename(unique_id)
2106 new_rpath = dev.dev_path
2107 if old_rpath != new_rpath:
2108 DevCacheManager.RemoveCache(old_rpath)
2109 # FIXME: we should add the new cache information here, like:
2110 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2111 # but we don't have the owner here - maybe parse from existing
2112 # cache? for now, we only lose lvm data when we rename, which
2113 # is less critical than DRBD or MD
2114 except errors.BlockDeviceError, err:
2115 msgs.append("Can't rename device '%s' to '%s': %s" %
2116 (dev, unique_id, err))
2117 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2120 _Fail("; ".join(msgs))
2123 def _TransformFileStorageDir(file_storage_dir):
2124 """Checks whether given file_storage_dir is valid.
2126 Checks wheter the given file_storage_dir is within the cluster-wide
2127 default file_storage_dir stored in SimpleStore. Only paths under that
2128 directory are allowed.
2130 @type file_storage_dir: str
2131 @param file_storage_dir: the path to check
2133 @return: the normalized path if valid, None otherwise
2137 file_storage_dir = os.path.normpath(file_storage_dir)
2138 base_file_storage_dir = cfg.GetFileStorageDir()
2139 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2140 base_file_storage_dir):
2141 _Fail("File storage directory '%s' is not under base file"
2142 " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2143 return file_storage_dir
2146 def CreateFileStorageDir(file_storage_dir):
2147 """Create file storage directory.
2149 @type file_storage_dir: str
2150 @param file_storage_dir: directory to create
2153 @return: tuple with first element a boolean indicating wheter dir
2154 creation was successful or not
2157 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2158 if os.path.exists(file_storage_dir):
2159 if not os.path.isdir(file_storage_dir):
2160 _Fail("Specified storage dir '%s' is not a directory",
2164 os.makedirs(file_storage_dir, 0750)
2165 except OSError, err:
2166 _Fail("Cannot create file storage directory '%s': %s",
2167 file_storage_dir, err, exc=True)
2170 def RemoveFileStorageDir(file_storage_dir):
2171 """Remove file storage directory.
2173 Remove it only if it's empty. If not log an error and return.
2175 @type file_storage_dir: str
2176 @param file_storage_dir: the directory we should cleanup
2177 @rtype: tuple (success,)
2178 @return: tuple of one element, C{success}, denoting
2179 whether the operation was successful
2182 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2183 if os.path.exists(file_storage_dir):
2184 if not os.path.isdir(file_storage_dir):
2185 _Fail("Specified Storage directory '%s' is not a directory",
2187 # deletes dir only if empty, otherwise we want to fail the rpc call
2189 os.rmdir(file_storage_dir)
2190 except OSError, err:
2191 _Fail("Cannot remove file storage directory '%s': %s",
2192 file_storage_dir, err)
2195 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2196 """Rename the file storage directory.
2198 @type old_file_storage_dir: str
2199 @param old_file_storage_dir: the current path
2200 @type new_file_storage_dir: str
2201 @param new_file_storage_dir: the name we should rename to
2202 @rtype: tuple (success,)
2203 @return: tuple of one element, C{success}, denoting
2204 whether the operation was successful
2207 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2208 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2209 if not os.path.exists(new_file_storage_dir):
2210 if os.path.isdir(old_file_storage_dir):
2212 os.rename(old_file_storage_dir, new_file_storage_dir)
2213 except OSError, err:
2214 _Fail("Cannot rename '%s' to '%s': %s",
2215 old_file_storage_dir, new_file_storage_dir, err)
2217 _Fail("Specified storage dir '%s' is not a directory",
2218 old_file_storage_dir)
2220 if os.path.exists(old_file_storage_dir):
2221 _Fail("Cannot rename '%s' to '%s': both locations exist",
2222 old_file_storage_dir, new_file_storage_dir)
2225 def _EnsureJobQueueFile(file_name):
2226 """Checks whether the given filename is in the queue directory.
2228 @type file_name: str
2229 @param file_name: the file name we should check
2231 @raises RPCFail: if the file is not valid
2234 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2235 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2238 _Fail("Passed job queue file '%s' does not belong to"
2239 " the queue directory '%s'", file_name, queue_dir)
2242 def JobQueueUpdate(file_name, content):
2243 """Updates a file in the queue directory.
2245 This is just a wrapper over L{utils.WriteFile}, with proper
2248 @type file_name: str
2249 @param file_name: the job file name
2251 @param content: the new job contents
2253 @return: the success of the operation
2256 _EnsureJobQueueFile(file_name)
2258 # Write and replace the file atomically
2259 utils.WriteFile(file_name, data=_Decompress(content))
2262 def JobQueueRename(old, new):
2263 """Renames a job queue file.
2265 This is just a wrapper over os.rename with proper checking.
2268 @param old: the old (actual) file name
2270 @param new: the desired file name
2272 @return: the success of the operation and payload
2275 _EnsureJobQueueFile(old)
2276 _EnsureJobQueueFile(new)
2278 utils.RenameFile(old, new, mkdir=True)
2281 def JobQueueSetDrainFlag(drain_flag):
2282 """Set the drain flag for the queue.
2284 This will set or unset the queue drain flag.
2286 @type drain_flag: boolean
2287 @param drain_flag: if True, will set the drain flag, otherwise reset it.
2289 @return: always True, None
2290 @warning: the function always returns True
2294 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2296 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2299 def BlockdevClose(instance_name, disks):
2300 """Closes the given block devices.
2302 This means they will be switched to secondary mode (in case of
2305 @param instance_name: if the argument is not empty, the symlinks
2306 of this instance will be removed
2307 @type disks: list of L{objects.Disk}
2308 @param disks: the list of disks to be closed
2309 @rtype: tuple (success, message)
2310 @return: a tuple of success and message, where success
2311 indicates the succes of the operation, and message
2312 which will contain the error details in case we
2318 rd = _RecursiveFindBD(cf)
2320 _Fail("Can't find device %s", cf)
2327 except errors.BlockDeviceError, err:
2328 msg.append(str(err))
2330 _Fail("Can't make devices secondary: %s", ",".join(msg))
2333 _RemoveBlockDevLinks(instance_name, disks)
2336 def ValidateHVParams(hvname, hvparams):
2337 """Validates the given hypervisor parameters.
2339 @type hvname: string
2340 @param hvname: the hypervisor name
2341 @type hvparams: dict
2342 @param hvparams: the hypervisor parameters to be validated
2347 hv_type = hypervisor.GetHypervisor(hvname)
2348 hv_type.ValidateParameters(hvparams)
2349 except errors.HypervisorError, err:
2350 _Fail(str(err), log=False)
2354 """Demotes the current node from master candidate role.
2357 # try to ensure we're not the master by mistake
2358 master, myself = ssconf.GetMasterAndMyself()
2359 if master == myself:
2360 _Fail("ssconf status shows I'm the master node, will not demote")
2361 pid_file = utils.DaemonPidFileName(constants.MASTERD)
2362 if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2363 _Fail("The master daemon is running, will not demote")
2365 if os.path.isfile(constants.CLUSTER_CONF_FILE):
2366 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2367 except EnvironmentError, err:
2368 if err.errno != errno.ENOENT:
2369 _Fail("Error while backing up cluster file: %s", err, exc=True)
2370 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2373 def _FindDisks(nodes_ip, disks):
2374 """Sets the physical ID on disks and returns the block devices.
2377 # set the correct physical ID
2378 my_name = utils.HostInfo().name
2380 cf.SetPhysicalID(my_name, nodes_ip)
2385 rd = _RecursiveFindBD(cf)
2387 _Fail("Can't find device %s", cf)
2392 def DrbdDisconnectNet(nodes_ip, disks):
2393 """Disconnects the network on a list of drbd devices.
2396 bdevs = _FindDisks(nodes_ip, disks)
2402 except errors.BlockDeviceError, err:
2403 _Fail("Can't change network configuration to standalone mode: %s",
2407 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2408 """Attaches the network on a list of drbd devices.
2411 bdevs = _FindDisks(nodes_ip, disks)
2414 for idx, rd in enumerate(bdevs):
2416 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2417 except EnvironmentError, err:
2418 _Fail("Can't create symlink: %s", err)
2419 # reconnect disks, switch to new master configuration and if
2420 # needed primary mode
2423 rd.AttachNet(multimaster)
2424 except errors.BlockDeviceError, err:
2425 _Fail("Can't change network configuration: %s", err)
2426 # wait until the disks are connected; we need to retry the re-attach
2427 # if the device becomes standalone, as this might happen if the one
2428 # node disconnects and reconnects in a different mode before the
2429 # other node reconnects; in this case, one or both of the nodes will
2430 # decide it has wrong configuration and switch to standalone
2431 RECONNECT_TIMEOUT = 2 * 60
2432 sleep_time = 0.100 # start with 100 miliseconds
2433 timeout_limit = time.time() + RECONNECT_TIMEOUT
2434 while time.time() < timeout_limit:
2435 all_connected = True
2437 stats = rd.GetProcStatus()
2438 if not (stats.is_connected or stats.is_in_resync):
2439 all_connected = False
2440 if stats.is_standalone:
2441 # peer had different config info and this node became
2442 # standalone, even though this should not happen with the
2443 # new staged way of changing disk configs
2445 rd.AttachNet(multimaster)
2446 except errors.BlockDeviceError, err:
2447 _Fail("Can't change network configuration: %s", err)
2450 time.sleep(sleep_time)
2451 sleep_time = min(5, sleep_time * 1.5)
2452 if not all_connected:
2453 _Fail("Timeout in disk reconnecting")
2455 # change to primary mode
2459 except errors.BlockDeviceError, err:
2460 _Fail("Can't change to primary mode: %s", err)
2463 def DrbdWaitSync(nodes_ip, disks):
2464 """Wait until DRBDs have synchronized.
2467 bdevs = _FindDisks(nodes_ip, disks)
2472 stats = rd.GetProcStatus()
2473 if not (stats.is_connected or stats.is_in_resync):
2474 _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2475 alldone = alldone and (not stats.is_in_resync)
2476 if stats.sync_percent is not None:
2477 min_resync = min(min_resync, stats.sync_percent)
2479 return (alldone, min_resync)
2482 def PowercycleNode(hypervisor_type):
2483 """Hard-powercycle the node.
2485 Because we need to return first, and schedule the powercycle in the
2486 background, we won't be able to report failures nicely.
2489 hyper = hypervisor.GetHypervisor(hypervisor_type)
2493 # if we can't fork, we'll pretend that we're in the child process
2496 return "Reboot scheduled in 5 seconds"
2498 hyper.PowercycleNode()
2501 class HooksRunner(object):
2504 This class is instantiated on the node side (ganeti-noded) and not
2508 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2510 def __init__(self, hooks_base_dir=None):
2511 """Constructor for hooks runner.
2513 @type hooks_base_dir: str or None
2514 @param hooks_base_dir: if not None, this overrides the
2515 L{constants.HOOKS_BASE_DIR} (useful for unittests)
2518 if hooks_base_dir is None:
2519 hooks_base_dir = constants.HOOKS_BASE_DIR
2520 self._BASE_DIR = hooks_base_dir
2523 def ExecHook(script, env):
2524 """Exec one hook script.
2527 @param script: the full path to the script
2529 @param env: the environment with which to exec the script
2530 @rtype: tuple (success, message)
2531 @return: a tuple of success and message, where success
2532 indicates the succes of the operation, and message
2533 which will contain the error details in case we
2537 # exec the process using subprocess and log the output
2540 fdstdin = open("/dev/null", "r")
2541 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2542 stderr=subprocess.STDOUT, close_fds=True,
2543 shell=False, cwd="/", env=env)
2546 output = child.stdout.read(4096)
2547 child.stdout.close()
2548 except EnvironmentError, err:
2549 output += "Hook script error: %s" % str(err)
2553 result = child.wait()
2555 except EnvironmentError, err:
2556 if err.errno == errno.EINTR:
2560 # try not to leak fds
2561 for fd in (fdstdin, ):
2565 except EnvironmentError, err:
2566 # just log the error
2567 #logging.exception("Error while closing fd %s", fd)
2570 return result == 0, utils.SafeEncode(output.strip())
2572 def RunHooks(self, hpath, phase, env):
2573 """Run the scripts in the hooks directory.
2576 @param hpath: the path to the hooks directory which
2579 @param phase: either L{constants.HOOKS_PHASE_PRE} or
2580 L{constants.HOOKS_PHASE_POST}
2582 @param env: dictionary with the environment for the hook
2584 @return: list of 3-element tuples:
2586 - script result, either L{constants.HKR_SUCCESS} or
2587 L{constants.HKR_FAIL}
2588 - output of the script
2590 @raise errors.ProgrammerError: for invalid input
2594 if phase == constants.HOOKS_PHASE_PRE:
2596 elif phase == constants.HOOKS_PHASE_POST:
2599 _Fail("Unknown hooks phase '%s'", phase)
2603 subdir = "%s-%s.d" % (hpath, suffix)
2604 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2606 dir_contents = utils.ListVisibleFiles(dir_name)
2608 # FIXME: must log output in case of failures
2611 # we use the standard python sort order,
2612 # so 00name is the recommended naming scheme
2614 for relname in dir_contents:
2615 fname = os.path.join(dir_name, relname)
2616 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2617 self.RE_MASK.match(relname) is not None):
2618 rrval = constants.HKR_SKIP
2621 result, output = self.ExecHook(fname, env)
2623 rrval = constants.HKR_FAIL
2625 rrval = constants.HKR_SUCCESS
2626 rr.append(("%s/%s" % (subdir, relname), rrval, output))
2631 class IAllocatorRunner(object):
2632 """IAllocator runner.
2634 This class is instantiated on the node side (ganeti-noded) and not on
2638 def Run(self, name, idata):
2639 """Run an iallocator script.
2642 @param name: the iallocator script name
2644 @param idata: the allocator input data
2647 @return: two element tuple of:
2649 - either error message or stdout of allocator (for success)
2652 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2654 if alloc_script is None:
2655 _Fail("iallocator module '%s' not found in the search path", name)
2657 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2661 result = utils.RunCmd([alloc_script, fin_name])
2663 _Fail("iallocator module '%s' failed: %s, output '%s'",
2664 name, result.fail_reason, result.output)
2668 return result.stdout
2671 class DevCacheManager(object):
2672 """Simple class for managing a cache of block device information.
2675 _DEV_PREFIX = "/dev/"
2676 _ROOT_DIR = constants.BDEV_CACHE_DIR
2679 def _ConvertPath(cls, dev_path):
2680 """Converts a /dev/name path to the cache file name.
2682 This replaces slashes with underscores and strips the /dev
2683 prefix. It then returns the full path to the cache file.
2686 @param dev_path: the C{/dev/} path name
2688 @return: the converted path name
2691 if dev_path.startswith(cls._DEV_PREFIX):
2692 dev_path = dev_path[len(cls._DEV_PREFIX):]
2693 dev_path = dev_path.replace("/", "_")
2694 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2698 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2699 """Updates the cache information for a given device.
2702 @param dev_path: the pathname of the device
2704 @param owner: the owner (instance name) of the device
2705 @type on_primary: bool
2706 @param on_primary: whether this is the primary
2709 @param iv_name: the instance-visible name of the
2710 device, as in objects.Disk.iv_name
2715 if dev_path is None:
2716 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2718 fpath = cls._ConvertPath(dev_path)
2724 iv_name = "not_visible"
2725 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2727 utils.WriteFile(fpath, data=fdata)
2728 except EnvironmentError, err:
2729 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2732 def RemoveCache(cls, dev_path):
2733 """Remove data for a dev_path.
2735 This is just a wrapper over L{utils.RemoveFile} with a converted
2736 path name and logging.
2739 @param dev_path: the pathname of the device
2744 if dev_path is None:
2745 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2747 fpath = cls._ConvertPath(dev_path)
2749 utils.RemoveFile(fpath)
2750 except EnvironmentError, err:
2751 logging.exception("Can't update bdev cache for %s: %s", dev_path, err)