4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
48 """Simple wrapper to return a SimpleStore.
50 @rtype: L{ssconf.SimpleStore}
51 @return: a SimpleStore instance
54 return ssconf.SimpleStore()
57 def _GetSshRunner(cluster_name):
58 """Simple wrapper to return an SshRunner.
60 @type cluster_name: str
61 @param cluster_name: the cluster name, which is needed
62 by the SshRunner constructor
63 @rtype: L{ssh.SshRunner}
64 @return: an SshRunner instance
67 return ssh.SshRunner(cluster_name)
70 def _CleanDirectory(path, exclude=[]):
71 """Removes all regular files in a directory.
74 @param path: the directory to clean
76 @param exclude: list of files to be excluded, defaults
81 if not os.path.isdir(path):
84 # Normalize excluded paths
85 exclude = [os.path.normpath(i) for i in exclude]
87 for rel_name in utils.ListVisibleFiles(path):
88 full_name = os.path.normpath(os.path.join(path, rel_name))
89 if full_name in exclude:
91 if os.path.isfile(full_name) and not os.path.islink(full_name):
92 utils.RemoveFile(full_name)
96 """Removes job queue files and archived jobs.
101 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
102 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
106 """Returns master information.
108 This is an utility function to compute master information, either
109 for consumption here or from the node daemon.
112 @return: (master_netdev, master_ip, master_name) if we have a good
113 configuration, otherwise (None, None, None)
118 master_netdev = cfg.GetMasterNetdev()
119 master_ip = cfg.GetMasterIP()
120 master_node = cfg.GetMasterNode()
121 except errors.ConfigurationError, err:
122 logging.exception("Cluster configuration incomplete")
123 return (None, None, None)
124 return (master_netdev, master_ip, master_node)
127 def StartMaster(start_daemons):
128 """Activate local node as master node.
130 The function will always try activate the IP address of the master
131 (unless someone else has it). It will also start the master daemons,
132 based on the start_daemons parameter.
134 @type start_daemons: boolean
135 @param start_daemons: whther to also start the master
136 daemons (ganeti-masterd and ganeti-rapi)
141 master_netdev, master_ip, _ = GetMasterInfo()
142 if not master_netdev:
145 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
146 if utils.OwnIpAddress(master_ip):
147 # we already have the ip:
148 logging.debug("Already started")
150 logging.error("Someone else has the master ip, not activating")
153 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
154 "dev", master_netdev, "label",
155 "%s:0" % master_netdev])
157 logging.error("Can't activate master IP: %s", result.output)
160 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
161 "-s", master_ip, master_ip])
162 # we'll ignore the exit code of arping
164 # and now start the master and rapi daemons
166 for daemon in 'ganeti-masterd', 'ganeti-rapi':
167 result = utils.RunCmd([daemon])
169 logging.error("Can't start daemon %s: %s", daemon, result.output)
174 def StopMaster(stop_daemons):
175 """Deactivate this node as master.
177 The function will always try to deactivate the IP address of the
178 master. It will also stop the master daemons depending on the
179 stop_daemons parameter.
181 @type stop_daemons: boolean
182 @param stop_daemons: whether to also stop the master daemons
183 (ganeti-masterd and ganeti-rapi)
187 master_netdev, master_ip, _ = GetMasterInfo()
188 if not master_netdev:
191 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
192 "dev", master_netdev])
194 logging.error("Can't remove the master IP, error: %s", result.output)
195 # but otherwise ignore the failure
198 # stop/kill the rapi and the master daemon
199 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
200 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
205 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
206 """Joins this node to the cluster.
208 This does the following:
209 - updates the hostkeys of the machine (rsa and dsa)
210 - adds the ssh private key to the user
211 - adds the ssh public key to the users' authorized_keys file
214 @param dsa: the DSA private key to write
216 @param dsapub: the DSA public key to write
218 @param rsa: the RSA private key to write
220 @param rsapub: the RSA public key to write
222 @param sshkey: the SSH private key to write
224 @param sshpub: the SSH public key to write
226 @return: the success of the operation
229 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
230 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
231 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
232 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
233 for name, content, mode in sshd_keys:
234 utils.WriteFile(name, data=content, mode=mode)
237 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
239 except errors.OpExecError, err:
240 logging.exception("Error while processing user ssh files")
243 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
244 utils.WriteFile(name, data=content, mode=0600)
246 utils.AddAuthorizedKey(auth_keys, sshpub)
248 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
254 """Cleans up and remove the current node.
256 This function cleans up and prepares the current node to be removed
259 If processing is successful, then it raises an
260 L{errors.GanetiQuitException} which is used as a special case to
261 shutdown the node daemon.
264 _CleanDirectory(constants.DATA_DIR)
268 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
269 except errors.OpExecError:
270 logging.exception("Error while processing ssh files")
273 f = open(pub_key, 'r')
275 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
279 utils.RemoveFile(priv_key)
280 utils.RemoveFile(pub_key)
282 # Return a reassuring string to the caller, and quit
283 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
286 def GetNodeInfo(vgname, hypervisor_type):
287 """Gives back a hash with different informations about the node.
289 @type vgname: C{string}
290 @param vgname: the name of the volume group to ask for disk space information
291 @type hypervisor_type: C{str}
292 @param hypervisor_type: the name of the hypervisor to ask for
295 @return: dictionary with the following keys:
296 - vg_size is the size of the configured volume group in MiB
297 - vg_free is the free size of the volume group in MiB
298 - memory_dom0 is the memory allocated for domain0 in MiB
299 - memory_free is the currently available (free) ram in MiB
300 - memory_total is the total number of ram in MiB
304 vginfo = _GetVGInfo(vgname)
305 outputarray['vg_size'] = vginfo['vg_size']
306 outputarray['vg_free'] = vginfo['vg_free']
308 hyper = hypervisor.GetHypervisor(hypervisor_type)
309 hyp_info = hyper.GetNodeInfo()
310 if hyp_info is not None:
311 outputarray.update(hyp_info)
313 f = open("/proc/sys/kernel/random/boot_id", 'r')
315 outputarray["bootid"] = f.read(128).rstrip("\n")
322 def VerifyNode(what, cluster_name):
323 """Verify the status of the local node.
325 Based on the input L{what} parameter, various checks are done on the
328 If the I{filelist} key is present, this list of
329 files is checksummed and the file/checksum pairs are returned.
331 If the I{nodelist} key is present, we check that we have
332 connectivity via ssh with the target nodes (and check the hostname
335 If the I{node-net-test} key is present, we check that we have
336 connectivity to the given nodes via both primary IP and, if
337 applicable, secondary IPs.
340 @param what: a dictionary of things to check:
341 - filelist: list of files for which to compute checksums
342 - nodelist: list of nodes we should check ssh communication with
343 - node-net-test: list of nodes we should check node daemon port
345 - hypervisor: list with hypervisors to run the verify for
347 @return: a dictionary with the same keys as the input dict, and
348 values representing the result of the checks
353 if constants.NV_HYPERVISOR in what:
354 result[constants.NV_HYPERVISOR] = tmp = {}
355 for hv_name in what[constants.NV_HYPERVISOR]:
356 tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
358 if constants.NV_FILELIST in what:
359 result[constants.NV_FILELIST] = utils.FingerprintFiles(
360 what[constants.NV_FILELIST])
362 if constants.NV_NODELIST in what:
363 result[constants.NV_NODELIST] = tmp = {}
364 random.shuffle(what[constants.NV_NODELIST])
365 for node in what[constants.NV_NODELIST]:
366 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
370 if constants.NV_NODENETTEST in what:
371 result[constants.NV_NODENETTEST] = tmp = {}
372 my_name = utils.HostInfo().name
373 my_pip = my_sip = None
374 for name, pip, sip in what[constants.NV_NODENETTEST]:
380 tmp[my_name] = ("Can't find my own primary/secondary IP"
383 port = utils.GetNodeDaemonPort()
384 for name, pip, sip in what[constants.NV_NODENETTEST]:
386 if not utils.TcpPing(pip, port, source=my_pip):
387 fail.append("primary")
389 if not utils.TcpPing(sip, port, source=my_sip):
390 fail.append("secondary")
392 tmp[name] = ("failure using the %s interface(s)" %
395 if constants.NV_LVLIST in what:
396 result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
398 if constants.NV_INSTANCELIST in what:
399 result[constants.NV_INSTANCELIST] = GetInstanceList(
400 what[constants.NV_INSTANCELIST])
402 if constants.NV_VGLIST in what:
403 result[constants.NV_VGLIST] = ListVolumeGroups()
405 if constants.NV_VERSION in what:
406 result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
408 if constants.NV_HVINFO in what:
409 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
410 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
415 def GetVolumeList(vg_name):
416 """Compute list of logical volumes and their size.
419 @param vg_name: the volume group whose LVs we should list
422 dictionary of all partions (key) with value being a tuple of
423 their size (in MiB), inactive and online status::
425 {'test1': ('20.06', True, True)}
427 in case of errors, a string is returned with the error
433 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
434 "--separator=%s" % sep,
435 "-olv_name,lv_size,lv_attr", vg_name])
437 logging.error("Failed to list logical volumes, lvs output: %s",
441 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
442 for line in result.stdout.splitlines():
444 match = valid_line_re.match(line)
446 logging.error("Invalid line returned from lvs output: '%s'", line)
448 name, size, attr = match.groups()
449 inactive = attr[4] == '-'
450 online = attr[5] == 'o'
451 lvs[name] = (size, inactive, online)
456 def ListVolumeGroups():
457 """List the volume groups and their size.
460 @return: dictionary with keys volume name and values the
464 return utils.ListVolumeGroups()
468 """List all volumes on this node.
472 A list of dictionaries, each having four keys:
473 - name: the logical volume name,
474 - size: the size of the logical volume
475 - dev: the physical device on which the LV lives
476 - vg: the volume group to which it belongs
478 In case of errors, we return an empty list and log the
481 Note that since a logical volume can live on multiple physical
482 volumes, the resulting list might include a logical volume
486 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
488 "--options=lv_name,lv_size,devices,vg_name"])
490 logging.error("Failed to list logical volumes, lvs output: %s",
496 return dev.split('(')[0]
502 'name': line[0].strip(),
503 'size': line[1].strip(),
504 'dev': parse_dev(line[2].strip()),
505 'vg': line[3].strip(),
508 return [map_line(line.split('|')) for line in result.stdout.splitlines()
509 if line.count('|') >= 3]
512 def BridgesExist(bridges_list):
513 """Check if a list of bridges exist on the current node.
516 @return: C{True} if all of them exist, C{False} otherwise
519 for bridge in bridges_list:
520 if not utils.BridgeExists(bridge):
526 def GetInstanceList(hypervisor_list):
527 """Provides a list of instances.
529 @type hypervisor_list: list
530 @param hypervisor_list: the list of hypervisors to query information
533 @return: a list of all running instances on the current node
534 - instance1.example.com
535 - instance2.example.com
539 for hname in hypervisor_list:
541 names = hypervisor.GetHypervisor(hname).ListInstances()
542 results.extend(names)
543 except errors.HypervisorError, err:
544 logging.exception("Error enumerating instances for hypevisor %s", hname)
545 # FIXME: should we somehow not propagate this to the master?
551 def GetInstanceInfo(instance, hname):
552 """Gives back the informations about an instance as a dictionary.
554 @type instance: string
555 @param instance: the instance name
557 @param hname: the hypervisor type of the instance
560 @return: dictionary with the following keys:
561 - memory: memory size of instance (int)
562 - state: xen state of instance (string)
563 - time: cpu time of instance (float)
568 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
569 if iinfo is not None:
570 output['memory'] = iinfo[2]
571 output['state'] = iinfo[4]
572 output['time'] = iinfo[5]
577 def GetAllInstancesInfo(hypervisor_list):
578 """Gather data about all instances.
580 This is the equivalent of L{GetInstanceInfo}, except that it
581 computes data for all instances at once, thus being faster if one
582 needs data about more than one instance.
584 @type hypervisor_list: list
585 @param hypervisor_list: list of hypervisors to query for instance data
588 @return: dictionary of instance: data, with data having the following keys:
589 - memory: memory size of instance (int)
590 - state: xen state of instance (string)
591 - time: cpu time of instance (float)
592 - vcpus: the number of vcpus
597 for hname in hypervisor_list:
598 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
600 for name, inst_id, memory, vcpus, state, times in iinfo:
607 if name in output and output[name] != value:
608 raise errors.HypervisorError("Instance %s running duplicate"
609 " with different parameters" % name)
615 def AddOSToInstance(instance):
616 """Add an OS to an instance.
618 @type instance: L{objects.Instance}
619 @param instance: Instance whose OS is to be installed
621 @return: the success of the operation
624 inst_os = OSFromDisk(instance.os)
626 create_env = OSEnvironment(instance)
628 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
629 instance.name, int(time.time()))
631 result = utils.RunCmd([inst_os.create_script], env=create_env,
632 cwd=inst_os.path, output=logfile,)
634 logging.error("os create command '%s' returned error: %s, logfile: %s,"
635 " output: %s", result.cmd, result.fail_reason, logfile,
642 def RunRenameInstance(instance, old_name):
643 """Run the OS rename script for an instance.
645 @type instance: L{objects.Instance}
646 @param instance: Instance whose OS is to be installed
647 @type old_name: string
648 @param old_name: previous instance name
650 @return: the success of the operation
653 inst_os = OSFromDisk(instance.os)
655 rename_env = OSEnvironment(instance)
656 rename_env['OLD_INSTANCE_NAME'] = old_name
658 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
660 instance.name, int(time.time()))
662 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
663 cwd=inst_os.path, output=logfile)
666 logging.error("os create command '%s' returned error: %s output: %s",
667 result.cmd, result.fail_reason, result.output)
673 def _GetVGInfo(vg_name):
674 """Get informations about the volume group.
677 @param vg_name: the volume group which we query
680 A dictionary with the following keys:
681 - C{vg_size} is the total size of the volume group in MiB
682 - C{vg_free} is the free size of the volume group in MiB
683 - C{pv_count} are the number of physical disks in that VG
685 If an error occurs during gathering of data, we return the same dict
686 with keys all set to None.
689 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
691 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
692 "--nosuffix", "--units=m", "--separator=:", vg_name])
695 logging.error("volume group %s not present", vg_name)
697 valarr = retval.stdout.strip().rstrip(':').split(':')
701 "vg_size": int(round(float(valarr[0]), 0)),
702 "vg_free": int(round(float(valarr[1]), 0)),
703 "pv_count": int(valarr[2]),
705 except ValueError, err:
706 logging.exception("Fail to parse vgs output")
708 logging.error("vgs output has the wrong number of fields (expected"
709 " three): %s", str(valarr))
713 def _GatherBlockDevs(instance):
714 """Set up an instance's block device(s).
716 This is run on the primary node at instance startup. The block
717 devices must be already assembled.
719 @type instance: L{objects.Instance}
720 @param instance: the instance whose disks we shoul assemble
721 @rtype: list of L{bdev.BlockDev}
722 @return: list of the block devices
726 for disk in instance.disks:
727 device = _RecursiveFindBD(disk)
729 raise errors.BlockDeviceError("Block device '%s' is not set up." %
732 block_devices.append((disk, device))
736 def StartInstance(instance, extra_args):
737 """Start an instance.
739 @type instance: L{objects.Instance}
740 @param instance: the instance object
742 @return: whether the startup was successful or not
745 running_instances = GetInstanceList([instance.hypervisor])
747 if instance.name in running_instances:
750 block_devices = _GatherBlockDevs(instance)
751 hyper = hypervisor.GetHypervisor(instance.hypervisor)
754 hyper.StartInstance(instance, block_devices, extra_args)
755 except errors.HypervisorError, err:
756 logging.exception("Failed to start instance")
762 def ShutdownInstance(instance):
763 """Shut an instance down.
765 @note: this functions uses polling with a hardcoded timeout.
767 @type instance: L{objects.Instance}
768 @param instance: the instance object
770 @return: whether the startup was successful or not
773 hv_name = instance.hypervisor
774 running_instances = GetInstanceList([hv_name])
776 if instance.name not in running_instances:
779 hyper = hypervisor.GetHypervisor(hv_name)
781 hyper.StopInstance(instance)
782 except errors.HypervisorError, err:
783 logging.error("Failed to stop instance")
786 # test every 10secs for 2min
790 for dummy in range(11):
791 if instance.name not in GetInstanceList([hv_name]):
795 # the shutdown did not succeed
796 logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
799 hyper.StopInstance(instance, force=True)
800 except errors.HypervisorError, err:
801 logging.exception("Failed to stop instance")
805 if instance.name in GetInstanceList([hv_name]):
806 logging.error("could not shutdown instance '%s' even by destroy",
813 def RebootInstance(instance, reboot_type, extra_args):
814 """Reboot an instance.
816 @type instance: L{objects.Instance}
817 @param instance: the instance object to reboot
818 @type reboot_type: str
819 @param reboot_type: the type of reboot, one the following
821 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
822 instance OS, do not recreate the VM
823 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
824 restart the VM (at the hypervisor level)
825 - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
826 is not accepted here, since that mode is handled
829 @return: the success of the operation
832 running_instances = GetInstanceList([instance.hypervisor])
834 if instance.name not in running_instances:
835 logging.error("Cannot reboot instance that is not running")
838 hyper = hypervisor.GetHypervisor(instance.hypervisor)
839 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
841 hyper.RebootInstance(instance)
842 except errors.HypervisorError, err:
843 logging.exception("Failed to soft reboot instance")
845 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
847 ShutdownInstance(instance)
848 StartInstance(instance, extra_args)
849 except errors.HypervisorError, err:
850 logging.exception("Failed to hard reboot instance")
853 raise errors.ParameterError("reboot_type invalid")
858 def MigrateInstance(instance, target, live):
859 """Migrates an instance to another node.
861 @type instance: L{objects.Instance}
862 @param instance: the instance definition
864 @param target: the target node name
866 @param live: whether the migration should be done live or not (the
867 interpretation of this parameter is left to the hypervisor)
869 @return: a tuple of (success, msg) where:
870 - succes is a boolean denoting the success/failure of the operation
871 - msg is a string with details in case of failure
874 hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
877 hyper.MigrateInstance(instance.name, target, live)
878 except errors.HypervisorError, err:
879 msg = "Failed to migrate instance: %s" % str(err)
882 return (True, "Migration successfull")
885 def CreateBlockDevice(disk, size, owner, on_primary, info):
886 """Creates a block device for an instance.
888 @type disk: L{objects.Disk}
889 @param disk: the object describing the disk we should create
891 @param size: the size of the physical underlying device, in MiB
893 @param owner: the name of the instance for which disk is created,
894 used for device cache data
895 @type on_primary: boolean
896 @param on_primary: indicates if it is the primary node or not
898 @param info: string that will be sent to the physical device
899 creation, used for example to set (LVM) tags on LVs
901 @return: the new unique_id of the device (this can sometime be
902 computed only after creation), or None. On secondary nodes,
903 it's not required to return anything.
908 for child in disk.children:
909 crdev = _RecursiveAssembleBD(child, owner, on_primary)
910 if on_primary or disk.AssembleOnSecondary():
911 # we need the children open in case the device itself has to
916 device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
917 if device is not None:
918 logging.info("removing existing device %s", disk)
920 except errors.BlockDeviceError, err:
923 device = bdev.Create(disk.dev_type, disk.physical_id,
926 raise ValueError("Can't create child device for %s, %s" %
928 if on_primary or disk.AssembleOnSecondary():
929 if not device.Assemble():
930 errorstring = "Can't assemble device after creation"
931 logging.error(errorstring)
932 raise errors.BlockDeviceError("%s, very unusual event - check the node"
933 " daemon logs" % errorstring)
934 device.SetSyncSpeed(constants.SYNC_SPEED)
935 if on_primary or disk.OpenOnSecondary():
936 device.Open(force=True)
937 DevCacheManager.UpdateCache(device.dev_path, owner,
938 on_primary, disk.iv_name)
942 physical_id = device.unique_id
946 def RemoveBlockDevice(disk):
947 """Remove a block device.
949 @note: This is intended to be called recursively.
951 @type disk: L{objects.disk}
952 @param disk: the disk object we should remove
954 @return: the success of the operation
958 # since we are removing the device, allow a partial match
959 # this allows removal of broken mirrors
960 rdev = _RecursiveFindBD(disk, allow_partial=True)
961 except errors.BlockDeviceError, err:
962 # probably can't attach
963 logging.info("Can't attach to device %s in remove", disk)
966 r_path = rdev.dev_path
967 result = rdev.Remove()
969 DevCacheManager.RemoveCache(r_path)
973 for child in disk.children:
974 result = result and RemoveBlockDevice(child)
978 def _RecursiveAssembleBD(disk, owner, as_primary):
979 """Activate a block device for an instance.
981 This is run on the primary and secondary nodes for an instance.
983 @note: this function is called recursively.
985 @type disk: L{objects.Disk}
986 @param disk: the disk we try to assemble
988 @param owner: the name of the instance which owns the disk
989 @type as_primary: boolean
990 @param as_primary: if we should make the block device
993 @return: the assembled device or None (in case no device
995 @raise errors.BlockDeviceError: in case there is an error
996 during the activation of the children or the device
1002 mcn = disk.ChildrenNeeded()
1004 mcn = 0 # max number of Nones allowed
1006 mcn = len(disk.children) - mcn # max number of Nones
1007 for chld_disk in disk.children:
1009 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1010 except errors.BlockDeviceError, err:
1011 if children.count(None) >= mcn:
1014 logging.debug("Error in child activation: %s", str(err))
1015 children.append(cdev)
1017 if as_primary or disk.AssembleOnSecondary():
1018 r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
1019 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1021 if as_primary or disk.OpenOnSecondary():
1023 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1024 as_primary, disk.iv_name)
1031 def AssembleBlockDevice(disk, owner, as_primary):
1032 """Activate a block device for an instance.
1034 This is a wrapper over _RecursiveAssembleBD.
1036 @rtype: str or boolean
1037 @return: a C{/dev/...} path for primary nodes, and
1038 C{True} for secondary nodes
1041 result = _RecursiveAssembleBD(disk, owner, as_primary)
1042 if isinstance(result, bdev.BlockDev):
1043 result = result.dev_path
1047 def ShutdownBlockDevice(disk):
1048 """Shut down a block device.
1050 First, if the device is assembled (can L{Attach()}), then the device
1051 is shutdown. Then the children of the device are shutdown.
1053 This function is called recursively. Note that we don't cache the
1054 children or such, as oppossed to assemble, shutdown of different
1055 devices doesn't require that the upper device was active.
1057 @type disk: L{objects.Disk}
1058 @param disk: the description of the disk we should
1061 @return: the success of the operation
1064 r_dev = _RecursiveFindBD(disk)
1065 if r_dev is not None:
1066 r_path = r_dev.dev_path
1067 result = r_dev.Shutdown()
1069 DevCacheManager.RemoveCache(r_path)
1073 for child in disk.children:
1074 result = result and ShutdownBlockDevice(child)
1078 def MirrorAddChildren(parent_cdev, new_cdevs):
1079 """Extend a mirrored block device.
1081 @type parent_cdev: L{objects.Disk}
1082 @param parent_cdev: the disk to which we should add children
1083 @type new_cdevs: list of L{objects.Disk}
1084 @param new_cdevs: the list of children which we should add
1086 @return: the success of the operation
1089 parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1090 if parent_bdev is None:
1091 logging.error("Can't find parent device")
1093 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1094 if new_bdevs.count(None) > 0:
1095 logging.error("Can't find new device(s) to add: %s:%s",
1096 new_bdevs, new_cdevs)
1098 parent_bdev.AddChildren(new_bdevs)
1102 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1103 """Shrink a mirrored block device.
1105 @type parent_cdev: L{objects.Disk}
1106 @param parent_cdev: the disk from which we should remove children
1107 @type new_cdevs: list of L{objects.Disk}
1108 @param new_cdevs: the list of children which we should remove
1110 @return: the success of the operation
1113 parent_bdev = _RecursiveFindBD(parent_cdev)
1114 if parent_bdev is None:
1115 logging.error("Can't find parent in remove children: %s", parent_cdev)
1118 for disk in new_cdevs:
1119 rpath = disk.StaticDevPath()
1121 bd = _RecursiveFindBD(disk)
1123 logging.error("Can't find dynamic device %s while removing children",
1127 devs.append(bd.dev_path)
1130 parent_bdev.RemoveChildren(devs)
1134 def GetMirrorStatus(disks):
1135 """Get the mirroring status of a list of devices.
1137 @type disks: list of L{objects.Disk}
1138 @param disks: the list of disks which we should query
1141 a list of (mirror_done, estimated_time) tuples, which
1142 are the result of L{bdev.BlockDevice.CombinedSyncStatus}
1143 @raise errors.BlockDeviceError: if any of the disks cannot be
1149 rbd = _RecursiveFindBD(dsk)
1151 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1152 stats.append(rbd.CombinedSyncStatus())
1156 def _RecursiveFindBD(disk, allow_partial=False):
1157 """Check if a device is activated.
1159 If so, return informations about the real device.
1161 @type disk: L{objects.Disk}
1162 @param disk: the disk object we need to find
1163 @type allow_partial: boolean
1164 @param allow_partial: if true, don't abort the find if a
1165 child of the device can't be found; this is intended
1166 to be used when repairing mirrors
1168 @return: None if the device can't be found,
1169 otherwise the device instance
1174 for chdisk in disk.children:
1175 children.append(_RecursiveFindBD(chdisk))
1177 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1180 def FindBlockDevice(disk):
1181 """Check if a device is activated.
1183 If it is, return informations about the real device.
1185 @type disk: L{objects.Disk}
1186 @param disk: the disk to find
1187 @rtype: None or tuple
1188 @return: None if the disk cannot be found, otherwise a
1189 tuple (device_path, major, minor, sync_percent,
1190 estimated_time, is_degraded)
1193 rbd = _RecursiveFindBD(disk)
1196 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1199 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1200 """Write a file to the filesystem.
1202 This allows the master to overwrite(!) a file. It will only perform
1203 the operation if the file belongs to a list of configuration files.
1205 @type file_name: str
1206 @param file_name: the target file name
1208 @param data: the new contents of the file
1210 @param mode: the mode to give the file (can be None)
1212 @param uid: the owner of the file (can be -1 for default)
1214 @param gid: the group of the file (can be -1 for default)
1216 @param atime: the atime to set on the file (can be None)
1218 @param mtime: the mtime to set on the file (can be None)
1220 @return: the success of the operation; errors are logged
1221 in the node daemon log
1224 if not os.path.isabs(file_name):
1225 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1230 constants.CLUSTER_CONF_FILE,
1231 constants.ETC_HOSTS,
1232 constants.SSH_KNOWN_HOSTS_FILE,
1233 constants.VNC_PASSWORD_FILE,
1236 if file_name not in allowed_files:
1237 logging.error("Filename passed to UploadFile not in allowed"
1238 " upload targets: '%s'", file_name)
1241 utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1242 atime=atime, mtime=mtime)
1246 def WriteSsconfFiles(values):
1247 """Update all ssconf files.
1249 Wrapper around the SimpleStore.WriteFiles.
1252 ssconf.SimpleStore().WriteFiles(values)
1255 def _ErrnoOrStr(err):
1256 """Format an EnvironmentError exception.
1258 If the L{err} argument has an errno attribute, it will be looked up
1259 and converted into a textual C{E...} description. Otherwise the
1260 string representation of the error will be returned.
1262 @type err: L{EnvironmentError}
1263 @param err: the exception to format
1266 if hasattr(err, 'errno'):
1267 detail = errno.errorcode[err.errno]
1273 def _OSOndiskVersion(name, os_dir):
1274 """Compute and return the API version of a given OS.
1276 This function will try to read the API version of the OS given by
1277 the 'name' parameter and residing in the 'os_dir' directory.
1280 @param name: the OS name we should look for
1282 @param os_dir: the directory inwhich we should look for the OS
1285 Either an integer denoting the version or None in the
1286 case when this is not a valid OS name.
1287 @raise errors.InvalidOS: if the OS cannot be found
1290 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1293 st = os.stat(api_file)
1294 except EnvironmentError, err:
1295 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1296 " found (%s)" % _ErrnoOrStr(err))
1298 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1299 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1305 api_versions = f.readlines()
1308 except EnvironmentError, err:
1309 raise errors.InvalidOS(name, os_dir, "error while reading the"
1310 " API version (%s)" % _ErrnoOrStr(err))
1312 api_versions = [version.strip() for version in api_versions]
1314 api_versions = [int(version) for version in api_versions]
1315 except (TypeError, ValueError), err:
1316 raise errors.InvalidOS(name, os_dir,
1317 "API version is not integer (%s)" % str(err))
1322 def DiagnoseOS(top_dirs=None):
1323 """Compute the validity for all OSes.
1325 @type top_dirs: list
1326 @param top_dirs: the list of directories in which to
1327 search (if not given defaults to
1328 L{constants.OS_SEARCH_PATH})
1329 @rtype: list of L{objects.OS}
1330 @return: an OS object for each name in all the given
1334 if top_dirs is None:
1335 top_dirs = constants.OS_SEARCH_PATH
1338 for dir_name in top_dirs:
1339 if os.path.isdir(dir_name):
1341 f_names = utils.ListVisibleFiles(dir_name)
1342 except EnvironmentError, err:
1343 logging.exception("Can't list the OS directory %s", dir_name)
1345 for name in f_names:
1347 os_inst = OSFromDisk(name, base_dir=dir_name)
1348 result.append(os_inst)
1349 except errors.InvalidOS, err:
1350 result.append(objects.OS.FromInvalidOS(err))
1355 def OSFromDisk(name, base_dir=None):
1356 """Create an OS instance from disk.
1358 This function will return an OS instance if the given name is a
1359 valid OS name. Otherwise, it will raise an appropriate
1360 L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1362 @type base_dir: string
1363 @keyword base_dir: Base directory containing OS installations.
1364 Defaults to a search in all the OS_SEARCH_PATH dirs.
1365 @rtype: L{objects.OS}
1366 @return: the OS instance if we find a valid one
1367 @raise errors.InvalidOS: if we don't find a valid OS
1370 if base_dir is None:
1371 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1373 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1375 os_dir = os.path.sep.join([base_dir, name])
1377 api_versions = _OSOndiskVersion(name, os_dir)
1379 if constants.OS_API_VERSION not in api_versions:
1380 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1381 " (found %s want %s)"
1382 % (api_versions, constants.OS_API_VERSION))
1384 # OS Scripts dictionary, we will populate it with the actual script names
1385 os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1387 for script in os_scripts:
1388 os_scripts[script] = os.path.sep.join([os_dir, script])
1391 st = os.stat(os_scripts[script])
1392 except EnvironmentError, err:
1393 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1394 (script, _ErrnoOrStr(err)))
1396 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1397 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1400 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1401 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1405 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1406 create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1407 export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1408 import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1409 rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1410 api_versions=api_versions)
1412 def OSEnvironment(instance, debug=0):
1413 """Calculate the environment for an os script.
1415 @type instance: L{objects.Instance}
1416 @param instance: target instance for the os script run
1417 @type debug: integer
1418 @param debug: debug level (0 or 1, for OS Api 10)
1420 @return: dict of environment variables
1421 @raise errors.BlockDeviceError: if the block device
1426 result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1427 result['INSTANCE_NAME'] = instance.name
1428 result['HYPERVISOR'] = instance.hypervisor
1429 result['DISK_COUNT'] = '%d' % len(instance.disks)
1430 result['NIC_COUNT'] = '%d' % len(instance.nics)
1431 result['DEBUG_LEVEL'] = '%d' % debug
1432 for idx, disk in enumerate(instance.disks):
1433 real_disk = _RecursiveFindBD(disk)
1434 if real_disk is None:
1435 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1438 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1439 # FIXME: When disks will have read-only mode, populate this
1440 result['DISK_%d_ACCESS' % idx] = 'W'
1441 if constants.HV_DISK_TYPE in instance.hvparams:
1442 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1443 instance.hvparams[constants.HV_DISK_TYPE]
1444 if disk.dev_type in constants.LDS_BLOCK:
1445 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1446 elif disk.dev_type == constants.LD_FILE:
1447 result['DISK_%d_BACKEND_TYPE' % idx] = \
1448 'file:%s' % disk.physical_id[0]
1449 for idx, nic in enumerate(instance.nics):
1450 result['NIC_%d_MAC' % idx] = nic.mac
1452 result['NIC_%d_IP' % idx] = nic.ip
1453 result['NIC_%d_BRIDGE' % idx] = nic.bridge
1454 if constants.HV_NIC_TYPE in instance.hvparams:
1455 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1456 instance.hvparams[constants.HV_NIC_TYPE]
1460 def GrowBlockDevice(disk, amount):
1461 """Grow a stack of block devices.
1463 This function is called recursively, with the childrens being the
1464 first ones to resize.
1466 @type disk: L{objects.Disk}
1467 @param disk: the disk to be grown
1468 @rtype: (status, result)
1469 @return: a tuple with the status of the operation
1470 (True/False), and the errors message if status
1474 r_dev = _RecursiveFindBD(disk)
1476 return False, "Cannot find block device %s" % (disk,)
1480 except errors.BlockDeviceError, err:
1481 return False, str(err)
1486 def SnapshotBlockDevice(disk):
1487 """Create a snapshot copy of a block device.
1489 This function is called recursively, and the snapshot is actually created
1490 just for the leaf lvm backend device.
1492 @type disk: L{objects.Disk}
1493 @param disk: the disk to be snapshotted
1495 @return: snapshot disk path
1499 if len(disk.children) == 1:
1500 # only one child, let's recurse on it
1501 return SnapshotBlockDevice(disk.children[0])
1503 # more than one child, choose one that matches
1504 for child in disk.children:
1505 if child.size == disk.size:
1506 # return implies breaking the loop
1507 return SnapshotBlockDevice(child)
1508 elif disk.dev_type == constants.LD_LV:
1509 r_dev = _RecursiveFindBD(disk)
1510 if r_dev is not None:
1511 # let's stay on the safe side and ask for the full size, for now
1512 return r_dev.Snapshot(disk.size)
1516 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1517 " '%s' of type '%s'" %
1518 (disk.unique_id, disk.dev_type))
1521 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1522 """Export a block device snapshot to a remote node.
1524 @type disk: L{objects.Disk}
1525 @param disk: the description of the disk to export
1526 @type dest_node: str
1527 @param dest_node: the destination node to export to
1528 @type instance: L{objects.Instance}
1529 @param instance: the instance object to whom the disk belongs
1530 @type cluster_name: str
1531 @param cluster_name: the cluster name, needed for SSH hostalias
1533 @param idx: the index of the disk in the instance's disk list,
1534 used to export to the OS scripts environment
1536 @return: the success of the operation
1539 export_env = OSEnvironment(instance)
1541 inst_os = OSFromDisk(instance.os)
1542 export_script = inst_os.export_script
1544 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1545 instance.name, int(time.time()))
1546 if not os.path.exists(constants.LOG_OS_DIR):
1547 os.mkdir(constants.LOG_OS_DIR, 0750)
1548 real_disk = _RecursiveFindBD(disk)
1549 if real_disk is None:
1550 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1554 export_env['EXPORT_DEVICE'] = real_disk.dev_path
1555 export_env['EXPORT_INDEX'] = str(idx)
1557 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1558 destfile = disk.physical_id[1]
1560 # the target command is built out of three individual commands,
1561 # which are joined by pipes; we check each individual command for
1563 expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1564 export_script, logfile)
1568 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1569 destdir, destdir, destfile)
1570 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1571 constants.GANETI_RUNAS,
1574 # all commands have been checked, so we're safe to combine them
1575 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1577 result = utils.RunCmd(command, env=export_env)
1580 logging.error("os snapshot export command '%s' returned error: %s"
1581 " output: %s", command, result.fail_reason, result.output)
1587 def FinalizeExport(instance, snap_disks):
1588 """Write out the export configuration information.
1590 @type instance: L{objects.Instance}
1591 @param instance: the instance which we export, used for
1592 saving configuration
1593 @type snap_disks: list of L{objects.Disk}
1594 @param snap_disks: list of snapshot block devices, which
1595 will be used to get the actual name of the dump file
1598 @return: the success of the operation
1601 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1602 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1604 config = objects.SerializableConfigParser()
1606 config.add_section(constants.INISECT_EXP)
1607 config.set(constants.INISECT_EXP, 'version', '0')
1608 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1609 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1610 config.set(constants.INISECT_EXP, 'os', instance.os)
1611 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1613 config.add_section(constants.INISECT_INS)
1614 config.set(constants.INISECT_INS, 'name', instance.name)
1615 config.set(constants.INISECT_INS, 'memory', '%d' %
1616 instance.beparams[constants.BE_MEMORY])
1617 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1618 instance.beparams[constants.BE_VCPUS])
1619 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1622 for nic_count, nic in enumerate(instance.nics):
1623 config.set(constants.INISECT_INS, 'nic%d_mac' %
1624 nic_count, '%s' % nic.mac)
1625 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1626 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1628 # TODO: redundant: on load can read nics until it doesn't exist
1629 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1632 for disk_count, disk in enumerate(snap_disks):
1635 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1636 ('%s' % disk.iv_name))
1637 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1638 ('%s' % disk.physical_id[1]))
1639 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1642 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1644 utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1645 data=config.Dumps())
1646 shutil.rmtree(finaldestdir, True)
1647 shutil.move(destdir, finaldestdir)
1652 def ExportInfo(dest):
1653 """Get export configuration information.
1656 @param dest: directory containing the export
1658 @rtype: L{objects.SerializableConfigParser}
1659 @return: a serializable config file containing the
1663 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1665 config = objects.SerializableConfigParser()
1668 if (not config.has_section(constants.INISECT_EXP) or
1669 not config.has_section(constants.INISECT_INS)):
1675 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1676 """Import an os image into an instance.
1678 @type instance: L{objects.Instance}
1679 @param instance: instance to import the disks into
1680 @type src_node: string
1681 @param src_node: source node for the disk images
1682 @type src_images: list of string
1683 @param src_images: absolute paths of the disk images
1684 @rtype: list of boolean
1685 @return: each boolean represent the success of importing the n-th disk
1688 import_env = OSEnvironment(instance)
1689 inst_os = OSFromDisk(instance.os)
1690 import_script = inst_os.import_script
1692 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1693 instance.name, int(time.time()))
1694 if not os.path.exists(constants.LOG_OS_DIR):
1695 os.mkdir(constants.LOG_OS_DIR, 0750)
1698 impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1699 import_script, logfile)
1702 for idx, image in enumerate(src_images):
1704 destcmd = utils.BuildShellCmd('cat %s', image)
1705 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1706 constants.GANETI_RUNAS,
1708 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1709 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1710 import_env['IMPORT_INDEX'] = str(idx)
1711 result = utils.RunCmd(command, env=import_env)
1713 logging.error("Disk import command '%s' returned error: %s"
1714 " output: %s", command, result.fail_reason,
1716 final_result.append(False)
1718 final_result.append(True)
1720 final_result.append(True)
1726 """Return a list of exports currently available on this machine.
1729 @return: list of the exports
1732 if os.path.isdir(constants.EXPORT_DIR):
1733 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1738 def RemoveExport(export):
1739 """Remove an existing export from the node.
1742 @param export: the name of the export to remove
1744 @return: the success of the operation
1747 target = os.path.join(constants.EXPORT_DIR, export)
1749 shutil.rmtree(target)
1750 # TODO: catch some of the relevant exceptions and provide a pretty
1751 # error message if rmtree fails.
1756 def RenameBlockDevices(devlist):
1757 """Rename a list of block devices.
1759 @type devlist: list of tuples
1760 @param devlist: list of tuples of the form (disk,
1761 new_logical_id, new_physical_id); disk is an
1762 L{objects.Disk} object describing the current disk,
1763 and new logical_id/physical_id is the name we
1766 @return: True if all renames succeeded, False otherwise
1770 for disk, unique_id in devlist:
1771 dev = _RecursiveFindBD(disk)
1776 old_rpath = dev.dev_path
1777 dev.Rename(unique_id)
1778 new_rpath = dev.dev_path
1779 if old_rpath != new_rpath:
1780 DevCacheManager.RemoveCache(old_rpath)
1781 # FIXME: we should add the new cache information here, like:
1782 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1783 # but we don't have the owner here - maybe parse from existing
1784 # cache? for now, we only lose lvm data when we rename, which
1785 # is less critical than DRBD or MD
1786 except errors.BlockDeviceError, err:
1787 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1792 def _TransformFileStorageDir(file_storage_dir):
1793 """Checks whether given file_storage_dir is valid.
1795 Checks wheter the given file_storage_dir is within the cluster-wide
1796 default file_storage_dir stored in SimpleStore. Only paths under that
1797 directory are allowed.
1799 @type file_storage_dir: str
1800 @param file_storage_dir: the path to check
1802 @return: the normalized path if valid, None otherwise
1806 file_storage_dir = os.path.normpath(file_storage_dir)
1807 base_file_storage_dir = cfg.GetFileStorageDir()
1808 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1809 base_file_storage_dir):
1810 logging.error("file storage directory '%s' is not under base file"
1811 " storage directory '%s'",
1812 file_storage_dir, base_file_storage_dir)
1814 return file_storage_dir
1817 def CreateFileStorageDir(file_storage_dir):
1818 """Create file storage directory.
1820 @type file_storage_dir: str
1821 @param file_storage_dir: directory to create
1824 @return: tuple with first element a boolean indicating wheter dir
1825 creation was successful or not
1828 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1830 if not file_storage_dir:
1833 if os.path.exists(file_storage_dir):
1834 if not os.path.isdir(file_storage_dir):
1835 logging.error("'%s' is not a directory", file_storage_dir)
1839 os.makedirs(file_storage_dir, 0750)
1840 except OSError, err:
1841 logging.error("Cannot create file storage directory '%s': %s",
1842 file_storage_dir, err)
1847 def RemoveFileStorageDir(file_storage_dir):
1848 """Remove file storage directory.
1850 Remove it only if it's empty. If not log an error and return.
1852 @type file_storage_dir: str
1853 @param file_storage_dir: the directory we should cleanup
1854 @rtype: tuple (success,)
1855 @return: tuple of one element, C{success}, denoting
1856 whether the operation was successfull
1859 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1861 if not file_storage_dir:
1864 if os.path.exists(file_storage_dir):
1865 if not os.path.isdir(file_storage_dir):
1866 logging.error("'%s' is not a directory", file_storage_dir)
1868 # deletes dir only if empty, otherwise we want to return False
1870 os.rmdir(file_storage_dir)
1871 except OSError, err:
1872 logging.exception("Cannot remove file storage directory '%s'",
1878 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1879 """Rename the file storage directory.
1881 @type old_file_storage_dir: str
1882 @param old_file_storage_dir: the current path
1883 @type new_file_storage_dir: str
1884 @param new_file_storage_dir: the name we should rename to
1885 @rtype: tuple (success,)
1886 @return: tuple of one element, C{success}, denoting
1887 whether the operation was successful
1890 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1891 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1893 if not old_file_storage_dir or not new_file_storage_dir:
1896 if not os.path.exists(new_file_storage_dir):
1897 if os.path.isdir(old_file_storage_dir):
1899 os.rename(old_file_storage_dir, new_file_storage_dir)
1900 except OSError, err:
1901 logging.exception("Cannot rename '%s' to '%s'",
1902 old_file_storage_dir, new_file_storage_dir)
1905 logging.error("'%s' is not a directory", old_file_storage_dir)
1908 if os.path.exists(old_file_storage_dir):
1909 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1910 old_file_storage_dir, new_file_storage_dir)
1915 def _IsJobQueueFile(file_name):
1916 """Checks whether the given filename is in the queue directory.
1918 @type file_name: str
1919 @param file_name: the file name we should check
1921 @return: whether the file is under the queue directory
1924 queue_dir = os.path.normpath(constants.QUEUE_DIR)
1925 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1928 logging.error("'%s' is not a file in the queue directory",
1934 def JobQueueUpdate(file_name, content):
1935 """Updates a file in the queue directory.
1937 This is just a wrapper over L{utils.WriteFile}, with proper
1940 @type file_name: str
1941 @param file_name: the job file name
1943 @param content: the new job contents
1945 @return: the success of the operation
1948 if not _IsJobQueueFile(file_name):
1951 # Write and replace the file atomically
1952 utils.WriteFile(file_name, data=content)
1957 def JobQueueRename(old, new):
1958 """Renames a job queue file.
1960 This is just a wrapper over L{os.rename} with proper checking.
1963 @param old: the old (actual) file name
1965 @param new: the desired file name
1967 @return: the success of the operation
1970 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1978 def JobQueueSetDrainFlag(drain_flag):
1979 """Set the drain flag for the queue.
1981 This will set or unset the queue drain flag.
1983 @type drain_flag: boolean
1984 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1986 @return: always True
1987 @warning: the function always returns True
1991 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1993 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1998 def CloseBlockDevices(disks):
1999 """Closes the given block devices.
2001 This means they will be switched to secondary mode (in case of
2004 @type disks: list of L{objects.Disk}
2005 @param disks: the list of disks to be closed
2006 @rtype: tuple (success, message)
2007 @return: a tuple of success and message, where success
2008 indicates the succes of the operation, and message
2009 which will contain the error details in case we
2015 rd = _RecursiveFindBD(cf)
2017 return (False, "Can't find device %s" % cf)
2024 except errors.BlockDeviceError, err:
2025 msg.append(str(err))
2027 return (False, "Can't make devices secondary: %s" % ",".join(msg))
2029 return (True, "All devices secondary")
2032 def ValidateHVParams(hvname, hvparams):
2033 """Validates the given hypervisor parameters.
2035 @type hvname: string
2036 @param hvname: the hypervisor name
2037 @type hvparams: dict
2038 @param hvparams: the hypervisor parameters to be validated
2039 @rtype: tuple (success, message)
2040 @return: a tuple of success and message, where success
2041 indicates the succes of the operation, and message
2042 which will contain the error details in case we
2047 hv_type = hypervisor.GetHypervisor(hvname)
2048 hv_type.ValidateParameters(hvparams)
2049 return (True, "Validation passed")
2050 except errors.HypervisorError, err:
2051 return (False, str(err))
2054 class HooksRunner(object):
2057 This class is instantiated on the node side (ganeti-noded) and not
2061 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2063 def __init__(self, hooks_base_dir=None):
2064 """Constructor for hooks runner.
2066 @type hooks_base_dir: str or None
2067 @param hooks_base_dir: if not None, this overrides the
2068 L{constants.HOOKS_BASE_DIR} (useful for unittests)
2071 if hooks_base_dir is None:
2072 hooks_base_dir = constants.HOOKS_BASE_DIR
2073 self._BASE_DIR = hooks_base_dir
2076 def ExecHook(script, env):
2077 """Exec one hook script.
2080 @param script: the full path to the script
2082 @param env: the environment with which to exec the script
2083 @rtype: tuple (success, message)
2084 @return: a tuple of success and message, where success
2085 indicates the succes of the operation, and message
2086 which will contain the error details in case we
2090 # exec the process using subprocess and log the output
2093 fdstdin = open("/dev/null", "r")
2094 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2095 stderr=subprocess.STDOUT, close_fds=True,
2096 shell=False, cwd="/", env=env)
2099 output = child.stdout.read(4096)
2100 child.stdout.close()
2101 except EnvironmentError, err:
2102 output += "Hook script error: %s" % str(err)
2106 result = child.wait()
2108 except EnvironmentError, err:
2109 if err.errno == errno.EINTR:
2113 # try not to leak fds
2114 for fd in (fdstdin, ):
2118 except EnvironmentError, err:
2119 # just log the error
2120 #logging.exception("Error while closing fd %s", fd)
2123 return result == 0, output
2125 def RunHooks(self, hpath, phase, env):
2126 """Run the scripts in the hooks directory.
2129 @param hpath: the path to the hooks directory which
2132 @param phase: either L{constants.HOOKS_PHASE_PRE} or
2133 L{constants.HOOKS_PHASE_POST}
2135 @param env: dictionary with the environment for the hook
2137 @return: list of 3-element tuples:
2139 - script result, either L{constants.HKR_SUCCESS} or
2140 L{constants.HKR_FAIL}
2141 - output of the script
2143 @raise errors.ProgrammerError: for invalid input
2147 if phase == constants.HOOKS_PHASE_PRE:
2149 elif phase == constants.HOOKS_PHASE_POST:
2152 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2155 subdir = "%s-%s.d" % (hpath, suffix)
2156 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2158 dir_contents = utils.ListVisibleFiles(dir_name)
2159 except OSError, err:
2160 # FIXME: must log output in case of failures
2163 # we use the standard python sort order,
2164 # so 00name is the recommended naming scheme
2166 for relname in dir_contents:
2167 fname = os.path.join(dir_name, relname)
2168 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2169 self.RE_MASK.match(relname) is not None):
2170 rrval = constants.HKR_SKIP
2173 result, output = self.ExecHook(fname, env)
2175 rrval = constants.HKR_FAIL
2177 rrval = constants.HKR_SUCCESS
2178 rr.append(("%s/%s" % (subdir, relname), rrval, output))
2183 class IAllocatorRunner(object):
2184 """IAllocator runner.
2186 This class is instantiated on the node side (ganeti-noded) and not on
2190 def Run(self, name, idata):
2191 """Run an iallocator script.
2194 @param name: the iallocator script name
2196 @param idata: the allocator input data
2199 @return: four element tuple of:
2200 - run status (one of the IARUN_ constants)
2203 - fail reason (as from L{utils.RunResult})
2206 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2208 if alloc_script is None:
2209 return (constants.IARUN_NOTFOUND, None, None, None)
2211 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2215 result = utils.RunCmd([alloc_script, fin_name])
2217 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2222 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2225 class DevCacheManager(object):
2226 """Simple class for managing a cache of block device information.
2229 _DEV_PREFIX = "/dev/"
2230 _ROOT_DIR = constants.BDEV_CACHE_DIR
2233 def _ConvertPath(cls, dev_path):
2234 """Converts a /dev/name path to the cache file name.
2236 This replaces slashes with underscores and strips the /dev
2237 prefix. It then returns the full path to the cache file.
2240 @param dev_path: the C{/dev/} path name
2242 @return: the converted path name
2245 if dev_path.startswith(cls._DEV_PREFIX):
2246 dev_path = dev_path[len(cls._DEV_PREFIX):]
2247 dev_path = dev_path.replace("/", "_")
2248 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2252 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2253 """Updates the cache information for a given device.
2256 @param dev_path: the pathname of the device
2258 @param owner: the owner (instance name) of the device
2259 @type on_primary: bool
2260 @param on_primary: whether this is the primary
2263 @param iv_name: the instance-visible name of the
2264 device, as in L{objects.Disk.iv_name}
2269 if dev_path is None:
2270 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2272 fpath = cls._ConvertPath(dev_path)
2278 iv_name = "not_visible"
2279 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2281 utils.WriteFile(fpath, data=fdata)
2282 except EnvironmentError, err:
2283 logging.exception("Can't update bdev cache for %s", dev_path)
2286 def RemoveCache(cls, dev_path):
2287 """Remove data for a dev_path.
2289 This is just a wrapper over L{utils.RemoveFile} with a converted
2290 path name and logging.
2293 @param dev_path: the pathname of the device
2298 if dev_path is None:
2299 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2301 fpath = cls._ConvertPath(dev_path)
2303 utils.RemoveFile(fpath)
2304 except EnvironmentError, err:
2305 logging.exception("Can't update bdev cache for %s", dev_path)