4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
49 class RPCFail(Exception):
50 """Class denoting RPC failure.
52 Its argument is the error message.
56 def _Fail(msg, *args, **kwargs):
57 """Log an error and the raise an RPCFail exception.
59 This exception is then handled specially in the ganeti daemon and
60 turned into a 'failed' return type. As such, this function is a
61 useful shortcut for logging the error and returning it to the master
65 @param msg: the text of the exception
71 if "exc" in kwargs and kwargs["exc"]:
72 logging.exception(msg)
79 """Simple wrapper to return a SimpleStore.
81 @rtype: L{ssconf.SimpleStore}
82 @return: a SimpleStore instance
85 return ssconf.SimpleStore()
88 def _GetSshRunner(cluster_name):
89 """Simple wrapper to return an SshRunner.
91 @type cluster_name: str
92 @param cluster_name: the cluster name, which is needed
93 by the SshRunner constructor
94 @rtype: L{ssh.SshRunner}
95 @return: an SshRunner instance
98 return ssh.SshRunner(cluster_name)
101 def _Decompress(data):
102 """Unpacks data compressed by the RPC client.
104 @type data: list or tuple
105 @param data: Data sent by RPC client
107 @return: Decompressed data
110 assert isinstance(data, (list, tuple))
111 assert len(data) == 2
112 (encoding, content) = data
113 if encoding == constants.RPC_ENCODING_NONE:
115 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
116 return zlib.decompress(base64.b64decode(content))
118 raise AssertionError("Unknown data encoding")
121 def _CleanDirectory(path, exclude=None):
122 """Removes all regular files in a directory.
125 @param path: the directory to clean
127 @param exclude: list of files to be excluded, defaults
131 if not os.path.isdir(path):
136 # Normalize excluded paths
137 exclude = [os.path.normpath(i) for i in exclude]
139 for rel_name in utils.ListVisibleFiles(path):
140 full_name = os.path.normpath(os.path.join(path, rel_name))
141 if full_name in exclude:
143 if os.path.isfile(full_name) and not os.path.islink(full_name):
144 utils.RemoveFile(full_name)
148 """Removes job queue files and archived jobs.
153 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
154 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
158 """Returns master information.
160 This is an utility function to compute master information, either
161 for consumption here or from the node daemon.
164 @return: True, (master_netdev, master_ip, master_name) in case of success
165 @raise RPCFail: in case of errors
170 master_netdev = cfg.GetMasterNetdev()
171 master_ip = cfg.GetMasterIP()
172 master_node = cfg.GetMasterNode()
173 except errors.ConfigurationError, err:
174 _Fail("Cluster configuration incomplete", exc=True)
175 return True, (master_netdev, master_ip, master_node)
178 def StartMaster(start_daemons):
179 """Activate local node as master node.
181 The function will always try activate the IP address of the master
182 (unless someone else has it). It will also start the master daemons,
183 based on the start_daemons parameter.
185 @type start_daemons: boolean
186 @param start_daemons: whther to also start the master
187 daemons (ganeti-masterd and ganeti-rapi)
191 # GetMasterInfo will raise an exception if not able to return data
192 master_netdev, master_ip, _ = GetMasterInfo()[1]
195 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
196 if utils.OwnIpAddress(master_ip):
197 # we already have the ip:
198 logging.debug("Master IP already configured, doing nothing")
200 msg = "Someone else has the master ip, not activating"
204 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
205 "dev", master_netdev, "label",
206 "%s:0" % master_netdev])
208 msg = "Can't activate master IP: %s" % result.output
212 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
213 "-s", master_ip, master_ip])
214 # we'll ignore the exit code of arping
216 # and now start the master and rapi daemons
218 for daemon in 'ganeti-masterd', 'ganeti-rapi':
219 result = utils.RunCmd([daemon])
221 msg = "Can't start daemon %s: %s" % (daemon, result.output)
225 return not payload, "; ".join(payload)
228 def StopMaster(stop_daemons):
229 """Deactivate this node as master.
231 The function will always try to deactivate the IP address of the
232 master. It will also stop the master daemons depending on the
233 stop_daemons parameter.
235 @type stop_daemons: boolean
236 @param stop_daemons: whether to also stop the master daemons
237 (ganeti-masterd and ganeti-rapi)
241 # TODO: log and report back to the caller the error failures; we
242 # need to decide in which case we fail the RPC for this
244 # GetMasterInfo will raise an exception if not able to return data
245 master_netdev, master_ip, _ = GetMasterInfo()[1]
247 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
248 "dev", master_netdev])
250 logging.error("Can't remove the master IP, error: %s", result.output)
251 # but otherwise ignore the failure
254 # stop/kill the rapi and the master daemon
255 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
256 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
261 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
262 """Joins this node to the cluster.
264 This does the following:
265 - updates the hostkeys of the machine (rsa and dsa)
266 - adds the ssh private key to the user
267 - adds the ssh public key to the users' authorized_keys file
270 @param dsa: the DSA private key to write
272 @param dsapub: the DSA public key to write
274 @param rsa: the RSA private key to write
276 @param rsapub: the RSA public key to write
278 @param sshkey: the SSH private key to write
280 @param sshpub: the SSH public key to write
282 @return: the success of the operation
285 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
286 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
287 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
288 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
289 for name, content, mode in sshd_keys:
290 utils.WriteFile(name, data=content, mode=mode)
293 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
295 except errors.OpExecError, err:
296 _Fail("Error while processing user ssh files: %s", err, exc=True)
298 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
299 utils.WriteFile(name, data=content, mode=0600)
301 utils.AddAuthorizedKey(auth_keys, sshpub)
303 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
305 return (True, "Node added successfully")
309 """Cleans up and remove the current node.
311 This function cleans up and prepares the current node to be removed
314 If processing is successful, then it raises an
315 L{errors.QuitGanetiException} which is used as a special case to
316 shutdown the node daemon.
319 _CleanDirectory(constants.DATA_DIR)
323 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
325 f = open(pub_key, 'r')
327 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
331 utils.RemoveFile(priv_key)
332 utils.RemoveFile(pub_key)
333 except errors.OpExecError:
334 logging.exception("Error while processing ssh files")
336 # Raise a custom exception (handled in ganeti-noded)
337 raise errors.QuitGanetiException(True, 'Shutdown scheduled')
340 def GetNodeInfo(vgname, hypervisor_type):
341 """Gives back a hash with different informations about the node.
343 @type vgname: C{string}
344 @param vgname: the name of the volume group to ask for disk space information
345 @type hypervisor_type: C{str}
346 @param hypervisor_type: the name of the hypervisor to ask for
349 @return: dictionary with the following keys:
350 - vg_size is the size of the configured volume group in MiB
351 - vg_free is the free size of the volume group in MiB
352 - memory_dom0 is the memory allocated for domain0 in MiB
353 - memory_free is the currently available (free) ram in MiB
354 - memory_total is the total number of ram in MiB
358 vginfo = _GetVGInfo(vgname)
359 outputarray['vg_size'] = vginfo['vg_size']
360 outputarray['vg_free'] = vginfo['vg_free']
362 hyper = hypervisor.GetHypervisor(hypervisor_type)
363 hyp_info = hyper.GetNodeInfo()
364 if hyp_info is not None:
365 outputarray.update(hyp_info)
367 f = open("/proc/sys/kernel/random/boot_id", 'r')
369 outputarray["bootid"] = f.read(128).rstrip("\n")
373 return True, outputarray
376 def VerifyNode(what, cluster_name):
377 """Verify the status of the local node.
379 Based on the input L{what} parameter, various checks are done on the
382 If the I{filelist} key is present, this list of
383 files is checksummed and the file/checksum pairs are returned.
385 If the I{nodelist} key is present, we check that we have
386 connectivity via ssh with the target nodes (and check the hostname
389 If the I{node-net-test} key is present, we check that we have
390 connectivity to the given nodes via both primary IP and, if
391 applicable, secondary IPs.
394 @param what: a dictionary of things to check:
395 - filelist: list of files for which to compute checksums
396 - nodelist: list of nodes we should check ssh communication with
397 - node-net-test: list of nodes we should check node daemon port
399 - hypervisor: list with hypervisors to run the verify for
401 @return: a dictionary with the same keys as the input dict, and
402 values representing the result of the checks
407 if constants.NV_HYPERVISOR in what:
408 result[constants.NV_HYPERVISOR] = tmp = {}
409 for hv_name in what[constants.NV_HYPERVISOR]:
410 tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
412 if constants.NV_FILELIST in what:
413 result[constants.NV_FILELIST] = utils.FingerprintFiles(
414 what[constants.NV_FILELIST])
416 if constants.NV_NODELIST in what:
417 result[constants.NV_NODELIST] = tmp = {}
418 random.shuffle(what[constants.NV_NODELIST])
419 for node in what[constants.NV_NODELIST]:
420 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
424 if constants.NV_NODENETTEST in what:
425 result[constants.NV_NODENETTEST] = tmp = {}
426 my_name = utils.HostInfo().name
427 my_pip = my_sip = None
428 for name, pip, sip in what[constants.NV_NODENETTEST]:
434 tmp[my_name] = ("Can't find my own primary/secondary IP"
437 port = utils.GetNodeDaemonPort()
438 for name, pip, sip in what[constants.NV_NODENETTEST]:
440 if not utils.TcpPing(pip, port, source=my_pip):
441 fail.append("primary")
443 if not utils.TcpPing(sip, port, source=my_sip):
444 fail.append("secondary")
446 tmp[name] = ("failure using the %s interface(s)" %
449 if constants.NV_LVLIST in what:
450 result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
452 if constants.NV_INSTANCELIST in what:
453 result[constants.NV_INSTANCELIST] = GetInstanceList(
454 what[constants.NV_INSTANCELIST])
456 if constants.NV_VGLIST in what:
457 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
459 if constants.NV_VERSION in what:
460 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
461 constants.RELEASE_VERSION)
463 if constants.NV_HVINFO in what:
464 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
465 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
467 if constants.NV_DRBDLIST in what:
469 used_minors = bdev.DRBD8.GetUsedDevs().keys()
470 except errors.BlockDeviceError, err:
471 logging.warning("Can't get used minors list", exc_info=True)
472 used_minors = str(err)
473 result[constants.NV_DRBDLIST] = used_minors
478 def GetVolumeList(vg_name):
479 """Compute list of logical volumes and their size.
482 @param vg_name: the volume group whose LVs we should list
485 dictionary of all partions (key) with value being a tuple of
486 their size (in MiB), inactive and online status::
488 {'test1': ('20.06', True, True)}
490 in case of errors, a string is returned with the error
496 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
497 "--separator=%s" % sep,
498 "-olv_name,lv_size,lv_attr", vg_name])
500 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
502 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
503 for line in result.stdout.splitlines():
505 match = valid_line_re.match(line)
507 logging.error("Invalid line returned from lvs output: '%s'", line)
509 name, size, attr = match.groups()
510 inactive = attr[4] == '-'
511 online = attr[5] == 'o'
512 lvs[name] = (size, inactive, online)
517 def ListVolumeGroups():
518 """List the volume groups and their size.
521 @return: dictionary with keys volume name and values the
525 return True, utils.ListVolumeGroups()
529 """List all volumes on this node.
533 A list of dictionaries, each having four keys:
534 - name: the logical volume name,
535 - size: the size of the logical volume
536 - dev: the physical device on which the LV lives
537 - vg: the volume group to which it belongs
539 In case of errors, we return an empty list and log the
542 Note that since a logical volume can live on multiple physical
543 volumes, the resulting list might include a logical volume
547 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
549 "--options=lv_name,lv_size,devices,vg_name"])
551 _Fail("Failed to list logical volumes, lvs output: %s",
556 return dev.split('(')[0]
562 'name': line[0].strip(),
563 'size': line[1].strip(),
564 'dev': parse_dev(line[2].strip()),
565 'vg': line[3].strip(),
568 return True, [map_line(line.split('|'))
569 for line in result.stdout.splitlines()
570 if line.count('|') >= 3]
573 def BridgesExist(bridges_list):
574 """Check if a list of bridges exist on the current node.
577 @return: C{True} if all of them exist, C{False} otherwise
581 for bridge in bridges_list:
582 if not utils.BridgeExists(bridge):
583 missing.append(bridge)
586 return False, "Missing bridges %s" % (", ".join(missing),)
591 def GetInstanceList(hypervisor_list):
592 """Provides a list of instances.
594 @type hypervisor_list: list
595 @param hypervisor_list: the list of hypervisors to query information
598 @return: a list of all running instances on the current node
599 - instance1.example.com
600 - instance2.example.com
604 for hname in hypervisor_list:
606 names = hypervisor.GetHypervisor(hname).ListInstances()
607 results.extend(names)
608 except errors.HypervisorError, err:
609 _Fail("Error enumerating instances (hypervisor %s): %s",
610 hname, err, exc=True)
615 def GetInstanceInfo(instance, hname):
616 """Gives back the informations about an instance as a dictionary.
618 @type instance: string
619 @param instance: the instance name
621 @param hname: the hypervisor type of the instance
624 @return: dictionary with the following keys:
625 - memory: memory size of instance (int)
626 - state: xen state of instance (string)
627 - time: cpu time of instance (float)
632 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
633 if iinfo is not None:
634 output['memory'] = iinfo[2]
635 output['state'] = iinfo[4]
636 output['time'] = iinfo[5]
641 def GetInstanceMigratable(instance):
642 """Gives whether an instance can be migrated.
644 @type instance: L{objects.Instance}
645 @param instance: object representing the instance to be checked.
648 @return: tuple of (result, description) where:
649 - result: whether the instance can be migrated or not
650 - description: a description of the issue, if relevant
653 hyper = hypervisor.GetHypervisor(instance.hypervisor)
654 if instance.name not in hyper.ListInstances():
655 return (False, 'not running')
657 for idx in range(len(instance.disks)):
658 link_name = _GetBlockDevSymlinkPath(instance.name, idx)
659 if not os.path.islink(link_name):
660 return (False, 'not restarted since ganeti 1.2.5')
665 def GetAllInstancesInfo(hypervisor_list):
666 """Gather data about all instances.
668 This is the equivalent of L{GetInstanceInfo}, except that it
669 computes data for all instances at once, thus being faster if one
670 needs data about more than one instance.
672 @type hypervisor_list: list
673 @param hypervisor_list: list of hypervisors to query for instance data
676 @return: dictionary of instance: data, with data having the following keys:
677 - memory: memory size of instance (int)
678 - state: xen state of instance (string)
679 - time: cpu time of instance (float)
680 - vcpus: the number of vcpus
685 for hname in hypervisor_list:
686 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
688 for name, inst_id, memory, vcpus, state, times in iinfo:
696 # we only check static parameters, like memory and vcpus,
697 # and not state and time which can change between the
698 # invocations of the different hypervisors
699 for key in 'memory', 'vcpus':
700 if value[key] != output[name][key]:
701 _Fail("Instance %s is running twice"
702 " with different parameters", name)
708 def InstanceOsAdd(instance, reinstall):
709 """Add an OS to an instance.
711 @type instance: L{objects.Instance}
712 @param instance: Instance whose OS is to be installed
713 @type reinstall: boolean
714 @param reinstall: whether this is an instance reinstall
716 @return: the success of the operation
720 inst_os = OSFromDisk(instance.os)
721 except errors.InvalidOS, err:
722 os_name, os_dir, os_err = err.args
724 return (False, "Can't find OS '%s': %s" % (os_name, os_err))
726 return (False, "Error parsing OS '%s' in directory %s: %s" %
727 (os_name, os_dir, os_err))
729 create_env = OSEnvironment(instance)
731 create_env['INSTANCE_REINSTALL'] = "1"
733 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
734 instance.name, int(time.time()))
736 result = utils.RunCmd([inst_os.create_script], env=create_env,
737 cwd=inst_os.path, output=logfile,)
739 logging.error("os create command '%s' returned error: %s, logfile: %s,"
740 " output: %s", result.cmd, result.fail_reason, logfile,
742 lines = [utils.SafeEncode(val)
743 for val in utils.TailFile(logfile, lines=20)]
744 return (False, "OS create script failed (%s), last lines in the"
745 " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
747 return (True, "Successfully installed")
750 def RunRenameInstance(instance, old_name):
751 """Run the OS rename script for an instance.
753 @type instance: L{objects.Instance}
754 @param instance: Instance whose OS is to be installed
755 @type old_name: string
756 @param old_name: previous instance name
758 @return: the success of the operation
761 inst_os = OSFromDisk(instance.os)
763 rename_env = OSEnvironment(instance)
764 rename_env['OLD_INSTANCE_NAME'] = old_name
766 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
768 instance.name, int(time.time()))
770 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
771 cwd=inst_os.path, output=logfile)
774 logging.error("os create command '%s' returned error: %s output: %s",
775 result.cmd, result.fail_reason, result.output)
776 lines = [utils.SafeEncode(val)
777 for val in utils.TailFile(logfile, lines=20)]
778 return (False, "OS rename script failed (%s), last lines in the"
779 " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
781 return (True, "Rename successful")
784 def _GetVGInfo(vg_name):
785 """Get informations about the volume group.
788 @param vg_name: the volume group which we query
791 A dictionary with the following keys:
792 - C{vg_size} is the total size of the volume group in MiB
793 - C{vg_free} is the free size of the volume group in MiB
794 - C{pv_count} are the number of physical disks in that VG
796 If an error occurs during gathering of data, we return the same dict
797 with keys all set to None.
800 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
802 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
803 "--nosuffix", "--units=m", "--separator=:", vg_name])
806 logging.error("volume group %s not present", vg_name)
808 valarr = retval.stdout.strip().rstrip(':').split(':')
812 "vg_size": int(round(float(valarr[0]), 0)),
813 "vg_free": int(round(float(valarr[1]), 0)),
814 "pv_count": int(valarr[2]),
816 except ValueError, err:
817 logging.exception("Fail to parse vgs output")
819 logging.error("vgs output has the wrong number of fields (expected"
820 " three): %s", str(valarr))
824 def _GetBlockDevSymlinkPath(instance_name, idx):
825 return os.path.join(constants.DISK_LINKS_DIR,
826 "%s:%d" % (instance_name, idx))
829 def _SymlinkBlockDev(instance_name, device_path, idx):
830 """Set up symlinks to a instance's block device.
832 This is an auxiliary function run when an instance is start (on the primary
833 node) or when an instance is migrated (on the target node).
836 @param instance_name: the name of the target instance
837 @param device_path: path of the physical block device, on the node
838 @param idx: the disk index
839 @return: absolute path to the disk's symlink
842 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
844 os.symlink(device_path, link_name)
846 if err.errno == errno.EEXIST:
847 if (not os.path.islink(link_name) or
848 os.readlink(link_name) != device_path):
850 os.symlink(device_path, link_name)
857 def _RemoveBlockDevLinks(instance_name, disks):
858 """Remove the block device symlinks belonging to the given instance.
861 for idx, disk in enumerate(disks):
862 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
863 if os.path.islink(link_name):
867 logging.exception("Can't remove symlink '%s'", link_name)
870 def _GatherAndLinkBlockDevs(instance):
871 """Set up an instance's block device(s).
873 This is run on the primary node at instance startup. The block
874 devices must be already assembled.
876 @type instance: L{objects.Instance}
877 @param instance: the instance whose disks we shoul assemble
879 @return: list of (disk_object, device_path)
883 for idx, disk in enumerate(instance.disks):
884 device = _RecursiveFindBD(disk)
886 raise errors.BlockDeviceError("Block device '%s' is not set up." %
890 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
892 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
895 block_devices.append((disk, link_name))
900 def StartInstance(instance):
901 """Start an instance.
903 @type instance: L{objects.Instance}
904 @param instance: the instance object
906 @return: whether the startup was successful or not
909 running_instances = GetInstanceList([instance.hypervisor])
911 if instance.name in running_instances:
912 return (True, "Already running")
915 block_devices = _GatherAndLinkBlockDevs(instance)
916 hyper = hypervisor.GetHypervisor(instance.hypervisor)
917 hyper.StartInstance(instance, block_devices)
918 except errors.BlockDeviceError, err:
919 _Fail("Block device error: %s", err, exc=True)
920 except errors.HypervisorError, err:
921 _RemoveBlockDevLinks(instance.name, instance.disks)
922 _Fail("Hypervisor error: %s", err, exc=True)
924 return (True, "Instance started successfully")
927 def InstanceShutdown(instance):
928 """Shut an instance down.
930 @note: this functions uses polling with a hardcoded timeout.
932 @type instance: L{objects.Instance}
933 @param instance: the instance object
935 @return: whether the startup was successful or not
938 hv_name = instance.hypervisor
939 running_instances = GetInstanceList([hv_name])
941 if instance.name not in running_instances:
942 return (True, "Instance already stopped")
944 hyper = hypervisor.GetHypervisor(hv_name)
946 hyper.StopInstance(instance)
947 except errors.HypervisorError, err:
948 _Fail("Failed to stop instance %s: %s", instance.name, err)
950 # test every 10secs for 2min
953 for dummy in range(11):
954 if instance.name not in GetInstanceList([hv_name]):
958 # the shutdown did not succeed
959 logging.error("Shutdown of '%s' unsuccessful, using destroy",
963 hyper.StopInstance(instance, force=True)
964 except errors.HypervisorError, err:
965 _Fail("Failed to force stop instance %s: %s", instance.name, err)
968 if instance.name in GetInstanceList([hv_name]):
969 _Fail("Could not shutdown instance %s even by destroy", instance.name)
971 _RemoveBlockDevLinks(instance.name, instance.disks)
973 return (True, "Instance has been shutdown successfully")
976 def InstanceReboot(instance, reboot_type):
977 """Reboot an instance.
979 @type instance: L{objects.Instance}
980 @param instance: the instance object to reboot
981 @type reboot_type: str
982 @param reboot_type: the type of reboot, one the following
984 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
985 instance OS, do not recreate the VM
986 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
987 restart the VM (at the hypervisor level)
988 - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
989 is not accepted here, since that mode is handled
992 @return: the success of the operation
995 running_instances = GetInstanceList([instance.hypervisor])
997 if instance.name not in running_instances:
998 _Fail("Cannot reboot instance %s that is not running", instance.name)
1000 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1001 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1003 hyper.RebootInstance(instance)
1004 except errors.HypervisorError, err:
1005 _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1006 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1008 stop_result = InstanceShutdown(instance)
1009 if not stop_result[0]:
1011 return StartInstance(instance)
1012 except errors.HypervisorError, err:
1013 _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1015 _Fail("Invalid reboot_type received: %s", reboot_type)
1017 return (True, "Reboot successful")
1020 def MigrationInfo(instance):
1021 """Gather information about an instance to be migrated.
1023 @type instance: L{objects.Instance}
1024 @param instance: the instance definition
1027 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1029 info = hyper.MigrationInfo(instance)
1030 except errors.HypervisorError, err:
1031 _Fail("Failed to fetch migration information: %s", err, exc=True)
1035 def AcceptInstance(instance, info, target):
1036 """Prepare the node to accept an instance.
1038 @type instance: L{objects.Instance}
1039 @param instance: the instance definition
1040 @type info: string/data (opaque)
1041 @param info: migration information, from the source node
1042 @type target: string
1043 @param target: target host (usually ip), on this node
1046 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1048 hyper.AcceptInstance(instance, info, target)
1049 except errors.HypervisorError, err:
1050 _Fail("Failed to accept instance: %s", err, exc=True)
1051 return (True, "Accept successfull")
1054 def FinalizeMigration(instance, info, success):
1055 """Finalize any preparation to accept an instance.
1057 @type instance: L{objects.Instance}
1058 @param instance: the instance definition
1059 @type info: string/data (opaque)
1060 @param info: migration information, from the source node
1061 @type success: boolean
1062 @param success: whether the migration was a success or a failure
1065 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1067 hyper.FinalizeMigration(instance, info, success)
1068 except errors.HypervisorError, err:
1069 _Fail("Failed to finalize migration: %s", err, exc=True)
1070 return (True, "Migration Finalized")
1073 def MigrateInstance(instance, target, live):
1074 """Migrates an instance to another node.
1076 @type instance: L{objects.Instance}
1077 @param instance: the instance definition
1078 @type target: string
1079 @param target: the target node name
1081 @param live: whether the migration should be done live or not (the
1082 interpretation of this parameter is left to the hypervisor)
1084 @return: a tuple of (success, msg) where:
1085 - succes is a boolean denoting the success/failure of the operation
1086 - msg is a string with details in case of failure
1089 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1092 hyper.MigrateInstance(instance.name, target, live)
1093 except errors.HypervisorError, err:
1094 _Fail("Failed to migrate instance: %s", err, exc=True)
1095 return (True, "Migration successfull")
1098 def BlockdevCreate(disk, size, owner, on_primary, info):
1099 """Creates a block device for an instance.
1101 @type disk: L{objects.Disk}
1102 @param disk: the object describing the disk we should create
1104 @param size: the size of the physical underlying device, in MiB
1106 @param owner: the name of the instance for which disk is created,
1107 used for device cache data
1108 @type on_primary: boolean
1109 @param on_primary: indicates if it is the primary node or not
1111 @param info: string that will be sent to the physical device
1112 creation, used for example to set (LVM) tags on LVs
1114 @return: the new unique_id of the device (this can sometime be
1115 computed only after creation), or None. On secondary nodes,
1116 it's not required to return anything.
1121 for child in disk.children:
1123 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1124 except errors.BlockDeviceError, err:
1125 _Fail("Can't assemble device %s: %s", child, err)
1126 if on_primary or disk.AssembleOnSecondary():
1127 # we need the children open in case the device itself has to
1131 except errors.BlockDeviceError, err:
1132 _Fail("Can't make child '%s' read-write: %s", child, err)
1136 device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1137 except errors.BlockDeviceError, err:
1138 _Fail("Can't create block device: %s", err)
1140 if on_primary or disk.AssembleOnSecondary():
1143 except errors.BlockDeviceError, err:
1144 _Fail("Can't assemble device after creation, unusual event: %s", err)
1145 device.SetSyncSpeed(constants.SYNC_SPEED)
1146 if on_primary or disk.OpenOnSecondary():
1148 device.Open(force=True)
1149 except errors.BlockDeviceError, err:
1150 _Fail("Can't make device r/w after creation, unusual event: %s", err)
1151 DevCacheManager.UpdateCache(device.dev_path, owner,
1152 on_primary, disk.iv_name)
1154 device.SetInfo(info)
1156 physical_id = device.unique_id
1157 return True, physical_id
1160 def BlockdevRemove(disk):
1161 """Remove a block device.
1163 @note: This is intended to be called recursively.
1165 @type disk: L{objects.Disk}
1166 @param disk: the disk object we should remove
1168 @return: the success of the operation
1174 rdev = _RecursiveFindBD(disk)
1175 except errors.BlockDeviceError, err:
1176 # probably can't attach
1177 logging.info("Can't attach to device %s in remove", disk)
1179 if rdev is not None:
1180 r_path = rdev.dev_path
1183 except errors.BlockDeviceError, err:
1184 msgs.append(str(err))
1187 DevCacheManager.RemoveCache(r_path)
1190 for child in disk.children:
1191 c_status, c_msg = BlockdevRemove(child)
1192 result = result and c_status
1193 if c_msg: # not an empty message
1196 return (result, "; ".join(msgs))
1199 def _RecursiveAssembleBD(disk, owner, as_primary):
1200 """Activate a block device for an instance.
1202 This is run on the primary and secondary nodes for an instance.
1204 @note: this function is called recursively.
1206 @type disk: L{objects.Disk}
1207 @param disk: the disk we try to assemble
1209 @param owner: the name of the instance which owns the disk
1210 @type as_primary: boolean
1211 @param as_primary: if we should make the block device
1214 @return: the assembled device or None (in case no device
1216 @raise errors.BlockDeviceError: in case there is an error
1217 during the activation of the children or the device
1223 mcn = disk.ChildrenNeeded()
1225 mcn = 0 # max number of Nones allowed
1227 mcn = len(disk.children) - mcn # max number of Nones
1228 for chld_disk in disk.children:
1230 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1231 except errors.BlockDeviceError, err:
1232 if children.count(None) >= mcn:
1235 logging.error("Error in child activation (but continuing): %s",
1237 children.append(cdev)
1239 if as_primary or disk.AssembleOnSecondary():
1240 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1241 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1243 if as_primary or disk.OpenOnSecondary():
1245 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1246 as_primary, disk.iv_name)
1253 def BlockdevAssemble(disk, owner, as_primary):
1254 """Activate a block device for an instance.
1256 This is a wrapper over _RecursiveAssembleBD.
1258 @rtype: str or boolean
1259 @return: a C{/dev/...} path for primary nodes, and
1260 C{True} for secondary nodes
1264 result = "no error information"
1266 result = _RecursiveAssembleBD(disk, owner, as_primary)
1267 if isinstance(result, bdev.BlockDev):
1268 result = result.dev_path
1269 except errors.BlockDeviceError, err:
1270 result = "Error while assembling disk: %s" % str(err)
1272 return (status, result)
1275 def BlockdevShutdown(disk):
1276 """Shut down a block device.
1278 First, if the device is assembled (Attach() is successfull), then
1279 the device is shutdown. Then the children of the device are
1282 This function is called recursively. Note that we don't cache the
1283 children or such, as oppossed to assemble, shutdown of different
1284 devices doesn't require that the upper device was active.
1286 @type disk: L{objects.Disk}
1287 @param disk: the description of the disk we should
1290 @return: the success of the operation
1295 r_dev = _RecursiveFindBD(disk)
1296 if r_dev is not None:
1297 r_path = r_dev.dev_path
1300 DevCacheManager.RemoveCache(r_path)
1301 except errors.BlockDeviceError, err:
1302 msgs.append(str(err))
1306 for child in disk.children:
1307 c_status, c_msg = BlockdevShutdown(child)
1308 result = result and c_status
1309 if c_msg: # not an empty message
1312 return (result, "; ".join(msgs))
1315 def BlockdevAddchildren(parent_cdev, new_cdevs):
1316 """Extend a mirrored block device.
1318 @type parent_cdev: L{objects.Disk}
1319 @param parent_cdev: the disk to which we should add children
1320 @type new_cdevs: list of L{objects.Disk}
1321 @param new_cdevs: the list of children which we should add
1323 @return: the success of the operation
1326 parent_bdev = _RecursiveFindBD(parent_cdev)
1327 if parent_bdev is None:
1328 _Fail("Can't find parent device '%s' in add children", parent_cdev)
1329 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1330 if new_bdevs.count(None) > 0:
1331 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1332 parent_bdev.AddChildren(new_bdevs)
1336 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1337 """Shrink a mirrored block device.
1339 @type parent_cdev: L{objects.Disk}
1340 @param parent_cdev: the disk from which we should remove children
1341 @type new_cdevs: list of L{objects.Disk}
1342 @param new_cdevs: the list of children which we should remove
1344 @return: the success of the operation
1347 parent_bdev = _RecursiveFindBD(parent_cdev)
1348 if parent_bdev is None:
1349 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1351 for disk in new_cdevs:
1352 rpath = disk.StaticDevPath()
1354 bd = _RecursiveFindBD(disk)
1356 _Fail("Can't find device %s while removing children", disk)
1358 devs.append(bd.dev_path)
1361 parent_bdev.RemoveChildren(devs)
1365 def BlockdevGetmirrorstatus(disks):
1366 """Get the mirroring status of a list of devices.
1368 @type disks: list of L{objects.Disk}
1369 @param disks: the list of disks which we should query
1372 a list of (mirror_done, estimated_time) tuples, which
1373 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1374 @raise errors.BlockDeviceError: if any of the disks cannot be
1380 rbd = _RecursiveFindBD(dsk)
1382 _Fail("Can't find device %s", dsk)
1383 stats.append(rbd.CombinedSyncStatus())
1387 def _RecursiveFindBD(disk):
1388 """Check if a device is activated.
1390 If so, return informations about the real device.
1392 @type disk: L{objects.Disk}
1393 @param disk: the disk object we need to find
1395 @return: None if the device can't be found,
1396 otherwise the device instance
1401 for chdisk in disk.children:
1402 children.append(_RecursiveFindBD(chdisk))
1404 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1407 def BlockdevFind(disk):
1408 """Check if a device is activated.
1410 If it is, return informations about the real device.
1412 @type disk: L{objects.Disk}
1413 @param disk: the disk to find
1414 @rtype: None or tuple
1415 @return: None if the disk cannot be found, otherwise a
1416 tuple (device_path, major, minor, sync_percent,
1417 estimated_time, is_degraded)
1421 rbd = _RecursiveFindBD(disk)
1422 except errors.BlockDeviceError, err:
1423 _Fail("Failed to find device: %s", err, exc=True)
1426 return (True, (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus())
1429 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1430 """Write a file to the filesystem.
1432 This allows the master to overwrite(!) a file. It will only perform
1433 the operation if the file belongs to a list of configuration files.
1435 @type file_name: str
1436 @param file_name: the target file name
1438 @param data: the new contents of the file
1440 @param mode: the mode to give the file (can be None)
1442 @param uid: the owner of the file (can be -1 for default)
1444 @param gid: the group of the file (can be -1 for default)
1446 @param atime: the atime to set on the file (can be None)
1448 @param mtime: the mtime to set on the file (can be None)
1450 @return: the success of the operation; errors are logged
1451 in the node daemon log
1454 if not os.path.isabs(file_name):
1455 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1457 allowed_files = set([
1458 constants.CLUSTER_CONF_FILE,
1459 constants.ETC_HOSTS,
1460 constants.SSH_KNOWN_HOSTS_FILE,
1461 constants.VNC_PASSWORD_FILE,
1462 constants.RAPI_CERT_FILE,
1463 constants.RAPI_USERS_FILE,
1466 for hv_name in constants.HYPER_TYPES:
1467 hv_class = hypervisor.GetHypervisor(hv_name)
1468 allowed_files.update(hv_class.GetAncillaryFiles())
1470 if file_name not in allowed_files:
1471 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1474 raw_data = _Decompress(data)
1476 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1477 atime=atime, mtime=mtime)
1478 return (True, "success")
1481 def WriteSsconfFiles(values):
1482 """Update all ssconf files.
1484 Wrapper around the SimpleStore.WriteFiles.
1487 ssconf.SimpleStore().WriteFiles(values)
1490 def _ErrnoOrStr(err):
1491 """Format an EnvironmentError exception.
1493 If the L{err} argument has an errno attribute, it will be looked up
1494 and converted into a textual C{E...} description. Otherwise the
1495 string representation of the error will be returned.
1497 @type err: L{EnvironmentError}
1498 @param err: the exception to format
1501 if hasattr(err, 'errno'):
1502 detail = errno.errorcode[err.errno]
1508 def _OSOndiskVersion(name, os_dir):
1509 """Compute and return the API version of a given OS.
1511 This function will try to read the API version of the OS given by
1512 the 'name' parameter and residing in the 'os_dir' directory.
1515 @param name: the OS name we should look for
1517 @param os_dir: the directory inwhich we should look for the OS
1520 Either an integer denoting the version or None in the
1521 case when this is not a valid OS name.
1522 @raise errors.InvalidOS: if the OS cannot be found
1525 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1528 st = os.stat(api_file)
1529 except EnvironmentError, err:
1530 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1531 " found (%s)" % _ErrnoOrStr(err))
1533 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1534 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1540 api_versions = f.readlines()
1543 except EnvironmentError, err:
1544 raise errors.InvalidOS(name, os_dir, "error while reading the"
1545 " API version (%s)" % _ErrnoOrStr(err))
1547 api_versions = [version.strip() for version in api_versions]
1549 api_versions = [int(version) for version in api_versions]
1550 except (TypeError, ValueError), err:
1551 raise errors.InvalidOS(name, os_dir,
1552 "API version is not integer (%s)" % str(err))
1557 def DiagnoseOS(top_dirs=None):
1558 """Compute the validity for all OSes.
1560 @type top_dirs: list
1561 @param top_dirs: the list of directories in which to
1562 search (if not given defaults to
1563 L{constants.OS_SEARCH_PATH})
1564 @rtype: list of L{objects.OS}
1565 @return: an OS object for each name in all the given
1569 if top_dirs is None:
1570 top_dirs = constants.OS_SEARCH_PATH
1573 for dir_name in top_dirs:
1574 if os.path.isdir(dir_name):
1576 f_names = utils.ListVisibleFiles(dir_name)
1577 except EnvironmentError, err:
1578 logging.exception("Can't list the OS directory %s", dir_name)
1580 for name in f_names:
1582 os_inst = OSFromDisk(name, base_dir=dir_name)
1583 result.append(os_inst)
1584 except errors.InvalidOS, err:
1585 result.append(objects.OS.FromInvalidOS(err))
1590 def OSFromDisk(name, base_dir=None):
1591 """Create an OS instance from disk.
1593 This function will return an OS instance if the given name is a
1594 valid OS name. Otherwise, it will raise an appropriate
1595 L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1597 @type base_dir: string
1598 @keyword base_dir: Base directory containing OS installations.
1599 Defaults to a search in all the OS_SEARCH_PATH dirs.
1600 @rtype: L{objects.OS}
1601 @return: the OS instance if we find a valid one
1602 @raise errors.InvalidOS: if we don't find a valid OS
1605 if base_dir is None:
1606 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1608 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1610 os_dir = os.path.sep.join([base_dir, name])
1612 api_versions = _OSOndiskVersion(name, os_dir)
1614 if constants.OS_API_VERSION not in api_versions:
1615 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1616 " (found %s want %s)"
1617 % (api_versions, constants.OS_API_VERSION))
1619 # OS Scripts dictionary, we will populate it with the actual script names
1620 os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1622 for script in os_scripts:
1623 os_scripts[script] = os.path.sep.join([os_dir, script])
1626 st = os.stat(os_scripts[script])
1627 except EnvironmentError, err:
1628 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1629 (script, _ErrnoOrStr(err)))
1631 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1632 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1635 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1636 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1640 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1641 create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1642 export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1643 import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1644 rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1645 api_versions=api_versions)
1647 def OSEnvironment(instance, debug=0):
1648 """Calculate the environment for an os script.
1650 @type instance: L{objects.Instance}
1651 @param instance: target instance for the os script run
1652 @type debug: integer
1653 @param debug: debug level (0 or 1, for OS Api 10)
1655 @return: dict of environment variables
1656 @raise errors.BlockDeviceError: if the block device
1661 result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1662 result['INSTANCE_NAME'] = instance.name
1663 result['INSTANCE_OS'] = instance.os
1664 result['HYPERVISOR'] = instance.hypervisor
1665 result['DISK_COUNT'] = '%d' % len(instance.disks)
1666 result['NIC_COUNT'] = '%d' % len(instance.nics)
1667 result['DEBUG_LEVEL'] = '%d' % debug
1668 for idx, disk in enumerate(instance.disks):
1669 real_disk = _RecursiveFindBD(disk)
1670 if real_disk is None:
1671 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1674 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1675 result['DISK_%d_ACCESS' % idx] = disk.mode
1676 if constants.HV_DISK_TYPE in instance.hvparams:
1677 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1678 instance.hvparams[constants.HV_DISK_TYPE]
1679 if disk.dev_type in constants.LDS_BLOCK:
1680 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1681 elif disk.dev_type == constants.LD_FILE:
1682 result['DISK_%d_BACKEND_TYPE' % idx] = \
1683 'file:%s' % disk.physical_id[0]
1684 for idx, nic in enumerate(instance.nics):
1685 result['NIC_%d_MAC' % idx] = nic.mac
1687 result['NIC_%d_IP' % idx] = nic.ip
1688 result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1689 if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1690 result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1691 if nic.nicparams[constants.NIC_LINK]:
1692 result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1693 if constants.HV_NIC_TYPE in instance.hvparams:
1694 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1695 instance.hvparams[constants.HV_NIC_TYPE]
1699 def BlockdevGrow(disk, amount):
1700 """Grow a stack of block devices.
1702 This function is called recursively, with the childrens being the
1703 first ones to resize.
1705 @type disk: L{objects.Disk}
1706 @param disk: the disk to be grown
1707 @rtype: (status, result)
1708 @return: a tuple with the status of the operation
1709 (True/False), and the errors message if status
1713 r_dev = _RecursiveFindBD(disk)
1715 return False, "Cannot find block device %s" % (disk,)
1719 except errors.BlockDeviceError, err:
1720 _Fail("Failed to grow block device: %s", err, exc=True)
1725 def BlockdevSnapshot(disk):
1726 """Create a snapshot copy of a block device.
1728 This function is called recursively, and the snapshot is actually created
1729 just for the leaf lvm backend device.
1731 @type disk: L{objects.Disk}
1732 @param disk: the disk to be snapshotted
1734 @return: snapshot disk path
1738 if len(disk.children) == 1:
1739 # only one child, let's recurse on it
1740 return BlockdevSnapshot(disk.children[0])
1742 # more than one child, choose one that matches
1743 for child in disk.children:
1744 if child.size == disk.size:
1745 # return implies breaking the loop
1746 return BlockdevSnapshot(child)
1747 elif disk.dev_type == constants.LD_LV:
1748 r_dev = _RecursiveFindBD(disk)
1749 if r_dev is not None:
1750 # let's stay on the safe side and ask for the full size, for now
1751 return True, r_dev.Snapshot(disk.size)
1753 _Fail("Cannot find block device %s", disk)
1755 _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1756 disk.unique_id, disk.dev_type)
1759 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1760 """Export a block device snapshot to a remote node.
1762 @type disk: L{objects.Disk}
1763 @param disk: the description of the disk to export
1764 @type dest_node: str
1765 @param dest_node: the destination node to export to
1766 @type instance: L{objects.Instance}
1767 @param instance: the instance object to whom the disk belongs
1768 @type cluster_name: str
1769 @param cluster_name: the cluster name, needed for SSH hostalias
1771 @param idx: the index of the disk in the instance's disk list,
1772 used to export to the OS scripts environment
1774 @return: the success of the operation
1777 export_env = OSEnvironment(instance)
1779 inst_os = OSFromDisk(instance.os)
1780 export_script = inst_os.export_script
1782 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1783 instance.name, int(time.time()))
1784 if not os.path.exists(constants.LOG_OS_DIR):
1785 os.mkdir(constants.LOG_OS_DIR, 0750)
1786 real_disk = _RecursiveFindBD(disk)
1787 if real_disk is None:
1788 _Fail("Block device '%s' is not set up", disk)
1792 export_env['EXPORT_DEVICE'] = real_disk.dev_path
1793 export_env['EXPORT_INDEX'] = str(idx)
1795 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1796 destfile = disk.physical_id[1]
1798 # the target command is built out of three individual commands,
1799 # which are joined by pipes; we check each individual command for
1801 expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1802 export_script, logfile)
1806 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1807 destdir, destdir, destfile)
1808 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1809 constants.GANETI_RUNAS,
1812 # all commands have been checked, so we're safe to combine them
1813 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1815 result = utils.RunCmd(command, env=export_env)
1818 _Fail("OS snapshot export command '%s' returned error: %s"
1819 " output: %s", command, result.fail_reason, result.output)
1824 def FinalizeExport(instance, snap_disks):
1825 """Write out the export configuration information.
1827 @type instance: L{objects.Instance}
1828 @param instance: the instance which we export, used for
1829 saving configuration
1830 @type snap_disks: list of L{objects.Disk}
1831 @param snap_disks: list of snapshot block devices, which
1832 will be used to get the actual name of the dump file
1835 @return: the success of the operation
1838 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1839 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1841 config = objects.SerializableConfigParser()
1843 config.add_section(constants.INISECT_EXP)
1844 config.set(constants.INISECT_EXP, 'version', '0')
1845 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1846 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1847 config.set(constants.INISECT_EXP, 'os', instance.os)
1848 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1850 config.add_section(constants.INISECT_INS)
1851 config.set(constants.INISECT_INS, 'name', instance.name)
1852 config.set(constants.INISECT_INS, 'memory', '%d' %
1853 instance.beparams[constants.BE_MEMORY])
1854 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1855 instance.beparams[constants.BE_VCPUS])
1856 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1859 for nic_count, nic in enumerate(instance.nics):
1861 config.set(constants.INISECT_INS, 'nic%d_mac' %
1862 nic_count, '%s' % nic.mac)
1863 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1864 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1866 # TODO: redundant: on load can read nics until it doesn't exist
1867 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1870 for disk_count, disk in enumerate(snap_disks):
1873 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1874 ('%s' % disk.iv_name))
1875 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1876 ('%s' % disk.physical_id[1]))
1877 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1880 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1882 utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1883 data=config.Dumps())
1884 shutil.rmtree(finaldestdir, True)
1885 shutil.move(destdir, finaldestdir)
1890 def ExportInfo(dest):
1891 """Get export configuration information.
1894 @param dest: directory containing the export
1896 @rtype: L{objects.SerializableConfigParser}
1897 @return: a serializable config file containing the
1901 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1903 config = objects.SerializableConfigParser()
1906 if (not config.has_section(constants.INISECT_EXP) or
1907 not config.has_section(constants.INISECT_INS)):
1908 _Fail("Export info file doesn't have the required fields")
1910 return True, config.Dumps()
1913 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1914 """Import an os image into an instance.
1916 @type instance: L{objects.Instance}
1917 @param instance: instance to import the disks into
1918 @type src_node: string
1919 @param src_node: source node for the disk images
1920 @type src_images: list of string
1921 @param src_images: absolute paths of the disk images
1922 @rtype: list of boolean
1923 @return: each boolean represent the success of importing the n-th disk
1926 import_env = OSEnvironment(instance)
1927 inst_os = OSFromDisk(instance.os)
1928 import_script = inst_os.import_script
1930 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1931 instance.name, int(time.time()))
1932 if not os.path.exists(constants.LOG_OS_DIR):
1933 os.mkdir(constants.LOG_OS_DIR, 0750)
1936 impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1937 import_script, logfile)
1940 for idx, image in enumerate(src_images):
1942 destcmd = utils.BuildShellCmd('cat %s', image)
1943 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1944 constants.GANETI_RUNAS,
1946 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1947 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1948 import_env['IMPORT_INDEX'] = str(idx)
1949 result = utils.RunCmd(command, env=import_env)
1951 logging.error("Disk import command '%s' returned error: %s"
1952 " output: %s", command, result.fail_reason,
1954 final_result.append("error importing disk %d: %s, %s" %
1955 (idx, result.fail_reason, result.output[-100]))
1958 return False, "; ".join(final_result)
1963 """Return a list of exports currently available on this machine.
1966 @return: list of the exports
1969 if os.path.isdir(constants.EXPORT_DIR):
1970 return True, utils.ListVisibleFiles(constants.EXPORT_DIR)
1972 return False, "No exports directory"
1975 def RemoveExport(export):
1976 """Remove an existing export from the node.
1979 @param export: the name of the export to remove
1981 @return: the success of the operation
1984 target = os.path.join(constants.EXPORT_DIR, export)
1987 shutil.rmtree(target)
1988 except EnvironmentError, err:
1989 _Fail("Error while removing the export: %s", err, exc=True)
1994 def BlockdevRename(devlist):
1995 """Rename a list of block devices.
1997 @type devlist: list of tuples
1998 @param devlist: list of tuples of the form (disk,
1999 new_logical_id, new_physical_id); disk is an
2000 L{objects.Disk} object describing the current disk,
2001 and new logical_id/physical_id is the name we
2004 @return: True if all renames succeeded, False otherwise
2009 for disk, unique_id in devlist:
2010 dev = _RecursiveFindBD(disk)
2012 msgs.append("Can't find device %s in rename" % str(disk))
2016 old_rpath = dev.dev_path
2017 dev.Rename(unique_id)
2018 new_rpath = dev.dev_path
2019 if old_rpath != new_rpath:
2020 DevCacheManager.RemoveCache(old_rpath)
2021 # FIXME: we should add the new cache information here, like:
2022 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2023 # but we don't have the owner here - maybe parse from existing
2024 # cache? for now, we only lose lvm data when we rename, which
2025 # is less critical than DRBD or MD
2026 except errors.BlockDeviceError, err:
2027 msgs.append("Can't rename device '%s' to '%s': %s" %
2028 (dev, unique_id, err))
2029 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2031 return (result, "; ".join(msgs))
2034 def _TransformFileStorageDir(file_storage_dir):
2035 """Checks whether given file_storage_dir is valid.
2037 Checks wheter the given file_storage_dir is within the cluster-wide
2038 default file_storage_dir stored in SimpleStore. Only paths under that
2039 directory are allowed.
2041 @type file_storage_dir: str
2042 @param file_storage_dir: the path to check
2044 @return: the normalized path if valid, None otherwise
2048 file_storage_dir = os.path.normpath(file_storage_dir)
2049 base_file_storage_dir = cfg.GetFileStorageDir()
2050 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2051 base_file_storage_dir):
2052 logging.error("file storage directory '%s' is not under base file"
2053 " storage directory '%s'",
2054 file_storage_dir, base_file_storage_dir)
2056 return file_storage_dir
2059 def CreateFileStorageDir(file_storage_dir):
2060 """Create file storage directory.
2062 @type file_storage_dir: str
2063 @param file_storage_dir: directory to create
2066 @return: tuple with first element a boolean indicating wheter dir
2067 creation was successful or not
2070 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2072 if not file_storage_dir:
2075 if os.path.exists(file_storage_dir):
2076 if not os.path.isdir(file_storage_dir):
2077 logging.error("'%s' is not a directory", file_storage_dir)
2081 os.makedirs(file_storage_dir, 0750)
2082 except OSError, err:
2083 logging.error("Cannot create file storage directory '%s': %s",
2084 file_storage_dir, err)
2089 def RemoveFileStorageDir(file_storage_dir):
2090 """Remove file storage directory.
2092 Remove it only if it's empty. If not log an error and return.
2094 @type file_storage_dir: str
2095 @param file_storage_dir: the directory we should cleanup
2096 @rtype: tuple (success,)
2097 @return: tuple of one element, C{success}, denoting
2098 whether the operation was successfull
2101 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2103 if not file_storage_dir:
2106 if os.path.exists(file_storage_dir):
2107 if not os.path.isdir(file_storage_dir):
2108 logging.error("'%s' is not a directory", file_storage_dir)
2110 # deletes dir only if empty, otherwise we want to return False
2112 os.rmdir(file_storage_dir)
2113 except OSError, err:
2114 logging.exception("Cannot remove file storage directory '%s'",
2120 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2121 """Rename the file storage directory.
2123 @type old_file_storage_dir: str
2124 @param old_file_storage_dir: the current path
2125 @type new_file_storage_dir: str
2126 @param new_file_storage_dir: the name we should rename to
2127 @rtype: tuple (success,)
2128 @return: tuple of one element, C{success}, denoting
2129 whether the operation was successful
2132 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2133 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2135 if not old_file_storage_dir or not new_file_storage_dir:
2138 if not os.path.exists(new_file_storage_dir):
2139 if os.path.isdir(old_file_storage_dir):
2141 os.rename(old_file_storage_dir, new_file_storage_dir)
2142 except OSError, err:
2143 logging.exception("Cannot rename '%s' to '%s'",
2144 old_file_storage_dir, new_file_storage_dir)
2147 logging.error("'%s' is not a directory", old_file_storage_dir)
2150 if os.path.exists(old_file_storage_dir):
2151 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2152 old_file_storage_dir, new_file_storage_dir)
2157 def _IsJobQueueFile(file_name):
2158 """Checks whether the given filename is in the queue directory.
2160 @type file_name: str
2161 @param file_name: the file name we should check
2163 @return: whether the file is under the queue directory
2166 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2167 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2170 logging.error("'%s' is not a file in the queue directory",
2176 def JobQueueUpdate(file_name, content):
2177 """Updates a file in the queue directory.
2179 This is just a wrapper over L{utils.WriteFile}, with proper
2182 @type file_name: str
2183 @param file_name: the job file name
2185 @param content: the new job contents
2187 @return: the success of the operation
2190 if not _IsJobQueueFile(file_name):
2193 # Write and replace the file atomically
2194 utils.WriteFile(file_name, data=_Decompress(content))
2199 def JobQueueRename(old, new):
2200 """Renames a job queue file.
2202 This is just a wrapper over os.rename with proper checking.
2205 @param old: the old (actual) file name
2207 @param new: the desired file name
2209 @return: the success of the operation
2212 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2215 utils.RenameFile(old, new, mkdir=True)
2220 def JobQueueSetDrainFlag(drain_flag):
2221 """Set the drain flag for the queue.
2223 This will set or unset the queue drain flag.
2225 @type drain_flag: boolean
2226 @param drain_flag: if True, will set the drain flag, otherwise reset it.
2228 @return: always True
2229 @warning: the function always returns True
2233 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2235 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2240 def BlockdevClose(instance_name, disks):
2241 """Closes the given block devices.
2243 This means they will be switched to secondary mode (in case of
2246 @param instance_name: if the argument is not empty, the symlinks
2247 of this instance will be removed
2248 @type disks: list of L{objects.Disk}
2249 @param disks: the list of disks to be closed
2250 @rtype: tuple (success, message)
2251 @return: a tuple of success and message, where success
2252 indicates the succes of the operation, and message
2253 which will contain the error details in case we
2259 rd = _RecursiveFindBD(cf)
2261 _Fail("Can't find device %s", cf)
2268 except errors.BlockDeviceError, err:
2269 msg.append(str(err))
2271 return (False, "Can't make devices secondary: %s" % ",".join(msg))
2274 _RemoveBlockDevLinks(instance_name, disks)
2275 return (True, "All devices secondary")
2278 def ValidateHVParams(hvname, hvparams):
2279 """Validates the given hypervisor parameters.
2281 @type hvname: string
2282 @param hvname: the hypervisor name
2283 @type hvparams: dict
2284 @param hvparams: the hypervisor parameters to be validated
2285 @rtype: tuple (success, message)
2286 @return: a tuple of success and message, where success
2287 indicates the succes of the operation, and message
2288 which will contain the error details in case we
2293 hv_type = hypervisor.GetHypervisor(hvname)
2294 hv_type.ValidateParameters(hvparams)
2295 return (True, "Validation passed")
2296 except errors.HypervisorError, err:
2297 return (False, str(err))
2301 """Demotes the current node from master candidate role.
2304 # try to ensure we're not the master by mistake
2305 master, myself = ssconf.GetMasterAndMyself()
2306 if master == myself:
2307 return (False, "ssconf status shows I'm the master node, will not demote")
2308 pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2309 if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2310 return (False, "The master daemon is running, will not demote")
2312 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2313 except EnvironmentError, err:
2314 if err.errno != errno.ENOENT:
2315 return (False, "Error while backing up cluster file: %s" % str(err))
2316 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2317 return (True, "Done")
2320 def _FindDisks(nodes_ip, disks):
2321 """Sets the physical ID on disks and returns the block devices.
2324 # set the correct physical ID
2325 my_name = utils.HostInfo().name
2327 cf.SetPhysicalID(my_name, nodes_ip)
2332 rd = _RecursiveFindBD(cf)
2334 return (False, "Can't find device %s" % cf)
2336 return (True, bdevs)
2339 def DrbdDisconnectNet(nodes_ip, disks):
2340 """Disconnects the network on a list of drbd devices.
2343 status, bdevs = _FindDisks(nodes_ip, disks)
2345 return status, bdevs
2351 except errors.BlockDeviceError, err:
2352 _Fail("Can't change network configuration to standalone mode: %s",
2354 return (True, "All disks are now disconnected")
2357 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2358 """Attaches the network on a list of drbd devices.
2361 status, bdevs = _FindDisks(nodes_ip, disks)
2363 return status, bdevs
2366 for idx, rd in enumerate(bdevs):
2368 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2369 except EnvironmentError, err:
2370 _Fail("Can't create symlink: %s", err)
2371 # reconnect disks, switch to new master configuration and if
2372 # needed primary mode
2375 rd.AttachNet(multimaster)
2376 except errors.BlockDeviceError, err:
2377 _Fail("Can't change network configuration: %s", err)
2378 # wait until the disks are connected; we need to retry the re-attach
2379 # if the device becomes standalone, as this might happen if the one
2380 # node disconnects and reconnects in a different mode before the
2381 # other node reconnects; in this case, one or both of the nodes will
2382 # decide it has wrong configuration and switch to standalone
2383 RECONNECT_TIMEOUT = 2 * 60
2384 sleep_time = 0.100 # start with 100 miliseconds
2385 timeout_limit = time.time() + RECONNECT_TIMEOUT
2386 while time.time() < timeout_limit:
2387 all_connected = True
2389 stats = rd.GetProcStatus()
2390 if not (stats.is_connected or stats.is_in_resync):
2391 all_connected = False
2392 if stats.is_standalone:
2393 # peer had different config info and this node became
2394 # standalone, even though this should not happen with the
2395 # new staged way of changing disk configs
2397 rd.ReAttachNet(multimaster)
2398 except errors.BlockDeviceError, err:
2399 _Fail("Can't change network configuration: %s", err)
2402 time.sleep(sleep_time)
2403 sleep_time = min(5, sleep_time * 1.5)
2404 if not all_connected:
2405 return (False, "Timeout in disk reconnecting")
2407 # change to primary mode
2411 except errors.BlockDeviceError, err:
2412 _Fail("Can't change to primary mode: %s", err)
2414 msg = "multi-master and primary"
2416 msg = "single-master"
2417 return (True, "Disks are now configured as %s" % msg)
2420 def DrbdWaitSync(nodes_ip, disks):
2421 """Wait until DRBDs have synchronized.
2424 status, bdevs = _FindDisks(nodes_ip, disks)
2426 return status, bdevs
2432 stats = rd.GetProcStatus()
2433 if not (stats.is_connected or stats.is_in_resync):
2436 alldone = alldone and (not stats.is_in_resync)
2437 if stats.sync_percent is not None:
2438 min_resync = min(min_resync, stats.sync_percent)
2439 return (not failure, (alldone, min_resync))
2442 def PowercycleNode(hypervisor_type):
2443 """Hard-powercycle the node.
2445 Because we need to return first, and schedule the powercycle in the
2446 background, we won't be able to report failures nicely.
2449 hyper = hypervisor.GetHypervisor(hypervisor_type)
2452 except OSError, err:
2453 # if we can't fork, we'll pretend that we're in the child process
2456 return (True, "Reboot scheduled in 5 seconds")
2458 hyper.PowercycleNode()
2461 class HooksRunner(object):
2464 This class is instantiated on the node side (ganeti-noded) and not
2468 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2470 def __init__(self, hooks_base_dir=None):
2471 """Constructor for hooks runner.
2473 @type hooks_base_dir: str or None
2474 @param hooks_base_dir: if not None, this overrides the
2475 L{constants.HOOKS_BASE_DIR} (useful for unittests)
2478 if hooks_base_dir is None:
2479 hooks_base_dir = constants.HOOKS_BASE_DIR
2480 self._BASE_DIR = hooks_base_dir
2483 def ExecHook(script, env):
2484 """Exec one hook script.
2487 @param script: the full path to the script
2489 @param env: the environment with which to exec the script
2490 @rtype: tuple (success, message)
2491 @return: a tuple of success and message, where success
2492 indicates the succes of the operation, and message
2493 which will contain the error details in case we
2497 # exec the process using subprocess and log the output
2500 fdstdin = open("/dev/null", "r")
2501 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2502 stderr=subprocess.STDOUT, close_fds=True,
2503 shell=False, cwd="/", env=env)
2506 output = child.stdout.read(4096)
2507 child.stdout.close()
2508 except EnvironmentError, err:
2509 output += "Hook script error: %s" % str(err)
2513 result = child.wait()
2515 except EnvironmentError, err:
2516 if err.errno == errno.EINTR:
2520 # try not to leak fds
2521 for fd in (fdstdin, ):
2525 except EnvironmentError, err:
2526 # just log the error
2527 #logging.exception("Error while closing fd %s", fd)
2530 return result == 0, utils.SafeEncode(output.strip())
2532 def RunHooks(self, hpath, phase, env):
2533 """Run the scripts in the hooks directory.
2536 @param hpath: the path to the hooks directory which
2539 @param phase: either L{constants.HOOKS_PHASE_PRE} or
2540 L{constants.HOOKS_PHASE_POST}
2542 @param env: dictionary with the environment for the hook
2544 @return: list of 3-element tuples:
2546 - script result, either L{constants.HKR_SUCCESS} or
2547 L{constants.HKR_FAIL}
2548 - output of the script
2550 @raise errors.ProgrammerError: for invalid input
2554 if phase == constants.HOOKS_PHASE_PRE:
2556 elif phase == constants.HOOKS_PHASE_POST:
2559 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2562 subdir = "%s-%s.d" % (hpath, suffix)
2563 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2565 dir_contents = utils.ListVisibleFiles(dir_name)
2566 except OSError, err:
2567 # FIXME: must log output in case of failures
2570 # we use the standard python sort order,
2571 # so 00name is the recommended naming scheme
2573 for relname in dir_contents:
2574 fname = os.path.join(dir_name, relname)
2575 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2576 self.RE_MASK.match(relname) is not None):
2577 rrval = constants.HKR_SKIP
2580 result, output = self.ExecHook(fname, env)
2582 rrval = constants.HKR_FAIL
2584 rrval = constants.HKR_SUCCESS
2585 rr.append(("%s/%s" % (subdir, relname), rrval, output))
2590 class IAllocatorRunner(object):
2591 """IAllocator runner.
2593 This class is instantiated on the node side (ganeti-noded) and not on
2597 def Run(self, name, idata):
2598 """Run an iallocator script.
2601 @param name: the iallocator script name
2603 @param idata: the allocator input data
2606 @return: four element tuple of:
2607 - run status (one of the IARUN_ constants)
2610 - fail reason (as from L{utils.RunResult})
2613 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2615 if alloc_script is None:
2616 return (constants.IARUN_NOTFOUND, None, None, None)
2618 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2622 result = utils.RunCmd([alloc_script, fin_name])
2624 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2629 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2632 class DevCacheManager(object):
2633 """Simple class for managing a cache of block device information.
2636 _DEV_PREFIX = "/dev/"
2637 _ROOT_DIR = constants.BDEV_CACHE_DIR
2640 def _ConvertPath(cls, dev_path):
2641 """Converts a /dev/name path to the cache file name.
2643 This replaces slashes with underscores and strips the /dev
2644 prefix. It then returns the full path to the cache file.
2647 @param dev_path: the C{/dev/} path name
2649 @return: the converted path name
2652 if dev_path.startswith(cls._DEV_PREFIX):
2653 dev_path = dev_path[len(cls._DEV_PREFIX):]
2654 dev_path = dev_path.replace("/", "_")
2655 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2659 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2660 """Updates the cache information for a given device.
2663 @param dev_path: the pathname of the device
2665 @param owner: the owner (instance name) of the device
2666 @type on_primary: bool
2667 @param on_primary: whether this is the primary
2670 @param iv_name: the instance-visible name of the
2671 device, as in objects.Disk.iv_name
2676 if dev_path is None:
2677 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2679 fpath = cls._ConvertPath(dev_path)
2685 iv_name = "not_visible"
2686 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2688 utils.WriteFile(fpath, data=fdata)
2689 except EnvironmentError, err:
2690 logging.exception("Can't update bdev cache for %s", dev_path)
2693 def RemoveCache(cls, dev_path):
2694 """Remove data for a dev_path.
2696 This is just a wrapper over L{utils.RemoveFile} with a converted
2697 path name and logging.
2700 @param dev_path: the pathname of the device
2705 if dev_path is None:
2706 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2708 fpath = cls._ConvertPath(dev_path)
2710 utils.RemoveFile(fpath)
2711 except EnvironmentError, err:
2712 logging.exception("Can't update bdev cache for %s", dev_path)