4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
48 return ssconf.SimpleConfigReader()
51 def _GetSshRunner(cluster_name):
52 return ssh.SshRunner(cluster_name)
55 def _CleanDirectory(path, exclude=[]):
56 """Removes all regular files in a directory.
58 @param exclude: List of files to be excluded.
62 if not os.path.isdir(path):
65 # Normalize excluded paths
66 exclude = [os.path.normpath(i) for i in exclude]
68 for rel_name in utils.ListVisibleFiles(path):
69 full_name = os.path.normpath(os.path.join(path, rel_name))
70 if full_name in exclude:
72 if os.path.isfile(full_name) and not os.path.islink(full_name):
73 utils.RemoveFile(full_name)
77 """Removes job queue files and archived jobs
80 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
85 """Returns master information.
87 This is an utility function to compute master information, either
88 for consumption here or from the node daemon.
91 @return: (master_netdev, master_ip, master_name)
96 master_netdev = cfg.GetMasterNetdev()
97 master_ip = cfg.GetMasterIP()
98 master_node = cfg.GetMasterNode()
99 except errors.ConfigurationError, err:
100 logging.exception("Cluster configuration incomplete")
102 return (master_netdev, master_ip, master_node)
105 def StartMaster(start_daemons):
106 """Activate local node as master node.
108 The function will always try activate the IP address of the master
109 (if someone else has it, then it won't). Then, if the start_daemons
110 parameter is True, it will also start the master daemons
111 (ganet-masterd and ganeti-rapi).
115 master_netdev, master_ip, _ = GetMasterInfo()
116 if not master_netdev:
119 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120 if utils.OwnIpAddress(master_ip):
121 # we already have the ip:
122 logging.debug("Already started")
124 logging.error("Someone else has the master ip, not activating")
127 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128 "dev", master_netdev, "label",
129 "%s:0" % master_netdev])
131 logging.error("Can't activate master IP: %s", result.output)
134 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135 "-s", master_ip, master_ip])
136 # we'll ignore the exit code of arping
138 # and now start the master and rapi daemons
140 for daemon in 'ganeti-masterd', 'ganeti-rapi':
141 result = utils.RunCmd([daemon])
143 logging.error("Can't start daemon %s: %s", daemon, result.output)
148 def StopMaster(stop_daemons):
149 """Deactivate this node as master.
151 The function will always try to deactivate the IP address of the
152 master. Then, if the stop_daemons parameter is True, it will also
153 stop the master daemons (ganet-masterd and ganeti-rapi).
156 master_netdev, master_ip, _ = GetMasterInfo()
157 if not master_netdev:
160 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161 "dev", master_netdev])
163 logging.error("Can't remove the master IP, error: %s", result.output)
164 # but otherwise ignore the failure
167 # stop/kill the rapi and the master daemon
168 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175 """Joins this node to the cluster.
177 This does the following:
178 - updates the hostkeys of the machine (rsa and dsa)
179 - adds the ssh private key to the user
180 - adds the ssh public key to the users' authorized_keys file
183 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187 for name, content, mode in sshd_keys:
188 utils.WriteFile(name, data=content, mode=mode)
191 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193 except errors.OpExecError, err:
194 logging.exception("Error while processing user ssh files")
197 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198 utils.WriteFile(name, data=content, mode=0600)
200 utils.AddAuthorizedKey(auth_keys, sshpub)
202 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
208 """Cleans up the current node and prepares it to be removed from the cluster.
211 _CleanDirectory(constants.DATA_DIR)
215 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216 except errors.OpExecError:
217 logging.exception("Error while processing ssh files")
220 f = open(pub_key, 'r')
222 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
226 utils.RemoveFile(priv_key)
227 utils.RemoveFile(pub_key)
229 # Return a reassuring string to the caller, and quit
230 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
233 def GetNodeInfo(vgname, hypervisor_type):
234 """Gives back a hash with different informations about the node.
236 @type vgname: C{string}
237 @param vgname: the name of the volume group to ask for disk space information
238 @type hypervisor_type: C{str}
239 @param hypervisor_type: the name of the hypervisor to ask for
242 @return: dictionary with the following keys:
243 - vg_size is the size of the configured volume group in MiB
244 - vg_free is the free size of the volume group in MiB
245 - memory_dom0 is the memory allocated for domain0 in MiB
246 - memory_free is the currently available (free) ram in MiB
247 - memory_total is the total number of ram in MiB
251 vginfo = _GetVGInfo(vgname)
252 outputarray['vg_size'] = vginfo['vg_size']
253 outputarray['vg_free'] = vginfo['vg_free']
255 hyper = hypervisor.GetHypervisor(hypervisor_type)
256 hyp_info = hyper.GetNodeInfo()
257 if hyp_info is not None:
258 outputarray.update(hyp_info)
260 f = open("/proc/sys/kernel/random/boot_id", 'r')
262 outputarray["bootid"] = f.read(128).rstrip("\n")
269 def VerifyNode(what, cluster_name):
270 """Verify the status of the local node.
272 Based on the input L{what} parameter, various checks are done on the
275 If the I{filelist} key is present, this list of
276 files is checksummed and the file/checksum pairs are returned.
278 If the I{nodelist} key is present, we check that we have
279 connectivity via ssh with the target nodes (and check the hostname
282 If the I{node-net-test} key is present, we check that we have
283 connectivity to the given nodes via both primary IP and, if
284 applicable, secondary IPs.
287 @param what: a dictionary of things to check:
288 - filelist: list of files for which to compute checksums
289 - nodelist: list of nodes we should check ssh communication with
290 - node-net-test: list of nodes we should check node daemon port
292 - hypervisor: list with hypervisors to run the verify for
297 if 'hypervisor' in what:
298 result['hypervisor'] = my_dict = {}
299 for hv_name in what['hypervisor']:
300 my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
302 if 'filelist' in what:
303 result['filelist'] = utils.FingerprintFiles(what['filelist'])
305 if 'nodelist' in what:
306 result['nodelist'] = {}
307 random.shuffle(what['nodelist'])
308 for node in what['nodelist']:
309 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
311 result['nodelist'][node] = message
312 if 'node-net-test' in what:
313 result['node-net-test'] = {}
314 my_name = utils.HostInfo().name
315 my_pip = my_sip = None
316 for name, pip, sip in what['node-net-test']:
322 result['node-net-test'][my_name] = ("Can't find my own"
323 " primary/secondary IP"
326 port = utils.GetNodeDaemonPort()
327 for name, pip, sip in what['node-net-test']:
329 if not utils.TcpPing(pip, port, source=my_pip):
330 fail.append("primary")
332 if not utils.TcpPing(sip, port, source=my_sip):
333 fail.append("secondary")
335 result['node-net-test'][name] = ("failure using the %s"
342 def GetVolumeList(vg_name):
343 """Compute list of logical volumes and their size.
346 dictionary of all partions (key) with their size (in MiB), inactive
348 {'test1': ('20.06', True, True)}
353 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354 "--separator=%s" % sep,
355 "-olv_name,lv_size,lv_attr", vg_name])
357 logging.error("Failed to list logical volumes, lvs output: %s",
361 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362 for line in result.stdout.splitlines():
364 match = valid_line_re.match(line)
366 logging.error("Invalid line returned from lvs output: '%s'", line)
368 name, size, attr = match.groups()
369 inactive = attr[4] == '-'
370 online = attr[5] == 'o'
371 lvs[name] = (size, inactive, online)
376 def ListVolumeGroups():
377 """List the volume groups and their size.
380 Dictionary with keys volume name and values the size of the volume
383 return utils.ListVolumeGroups()
387 """List all volumes on this node.
390 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
392 "--options=lv_name,lv_size,devices,vg_name"])
394 logging.error("Failed to list logical volumes, lvs output: %s",
400 return dev.split('(')[0]
406 'name': line[0].strip(),
407 'size': line[1].strip(),
408 'dev': parse_dev(line[2].strip()),
409 'vg': line[3].strip(),
412 return [map_line(line.split('|')) for line in result.stdout.splitlines()
413 if line.count('|') >= 3]
416 def BridgesExist(bridges_list):
417 """Check if a list of bridges exist on the current node.
420 @return: C{True} if all of them exist, C{False} otherwise
423 for bridge in bridges_list:
424 if not utils.BridgeExists(bridge):
430 def GetInstanceList(hypervisor_list):
431 """Provides a list of instances.
433 @type hypervisor_list: list
434 @param hypervisor_list: the list of hypervisors to query information
437 @return: a list of all running instances on the current node
438 - instance1.example.com
439 - instance2.example.com
443 for hname in hypervisor_list:
445 names = hypervisor.GetHypervisor(hname).ListInstances()
446 results.extend(names)
447 except errors.HypervisorError, err:
448 logging.exception("Error enumerating instances for hypevisor %s", hname)
449 # FIXME: should we somehow not propagate this to the master?
455 def GetInstanceInfo(instance, hname):
456 """Gives back the informations about an instance as a dictionary.
458 @type instance: string
459 @param instance: the instance name
461 @param hname: the hypervisor type of the instance
464 @return: dictionary with the following keys:
465 - memory: memory size of instance (int)
466 - state: xen state of instance (string)
467 - time: cpu time of instance (float)
472 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473 if iinfo is not None:
474 output['memory'] = iinfo[2]
475 output['state'] = iinfo[4]
476 output['time'] = iinfo[5]
481 def GetAllInstancesInfo(hypervisor_list):
482 """Gather data about all instances.
484 This is the equivalent of `GetInstanceInfo()`, except that it
485 computes data for all instances at once, thus being faster if one
486 needs data about more than one instance.
488 @type hypervisor_list: list
489 @param hypervisor_list: list of hypervisors to query for instance data
491 @rtype: dict of dicts
492 @return: dictionary of instance: data, with data having the following keys:
493 - memory: memory size of instance (int)
494 - state: xen state of instance (string)
495 - time: cpu time of instance (float)
496 - vcpuus: the number of vcpus
501 for hname in hypervisor_list:
502 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
504 for name, inst_id, memory, vcpus, state, times in iinfo:
511 if name in output and output[name] != value:
512 raise errors.HypervisorError("Instance %s running duplicate"
513 " with different parameters" % name)
519 def AddOSToInstance(instance):
520 """Add an OS to an instance.
522 @type instance: L{objects.Instance}
523 @param instance: Instance whose OS is to be installed
526 inst_os = OSFromDisk(instance.os)
528 create_script = inst_os.create_script
529 create_env = OSEnvironment(instance)
531 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
532 instance.name, int(time.time()))
533 if not os.path.exists(constants.LOG_OS_DIR):
534 os.mkdir(constants.LOG_OS_DIR, 0750)
536 command = utils.BuildShellCmd("cd %s && %s &>%s",
537 inst_os.path, create_script, logfile)
539 result = utils.RunCmd(command, env=create_env)
541 logging.error("os create command '%s' returned error: %s, logfile: %s,"
542 " output: %s", command, result.fail_reason, logfile,
549 def RunRenameInstance(instance, old_name):
550 """Run the OS rename script for an instance.
552 @type instance: L{objects.Instance}
553 @param instance: Instance whose OS is to be installed
554 @type old_name: string
555 @param old_name: previous instance name
558 inst_os = OSFromDisk(instance.os)
560 script = inst_os.rename_script
561 rename_env = OSEnvironment(instance)
562 rename_env['OLD_INSTANCE_NAME'] = old_name
564 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
566 instance.name, int(time.time()))
567 if not os.path.exists(constants.LOG_OS_DIR):
568 os.mkdir(constants.LOG_OS_DIR, 0750)
570 command = utils.BuildShellCmd("cd %s && %s &>%s",
571 inst_os.path, script, logfile)
573 result = utils.RunCmd(command, env=rename_env)
576 logging.error("os create command '%s' returned error: %s output: %s",
577 command, result.fail_reason, result.output)
583 def _GetVGInfo(vg_name):
584 """Get informations about the volume group.
587 vg_name: the volume group
590 { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
592 vg_size is the total size of the volume group in MiB
593 vg_free is the free size of the volume group in MiB
594 pv_count are the number of physical disks in that vg
596 If an error occurs during gathering of data, we return the same dict
597 with keys all set to None.
600 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
602 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
603 "--nosuffix", "--units=m", "--separator=:", vg_name])
606 logging.error("volume group %s not present", vg_name)
608 valarr = retval.stdout.strip().rstrip(':').split(':')
612 "vg_size": int(round(float(valarr[0]), 0)),
613 "vg_free": int(round(float(valarr[1]), 0)),
614 "pv_count": int(valarr[2]),
616 except ValueError, err:
617 logging.exception("Fail to parse vgs output")
619 logging.error("vgs output has the wrong number of fields (expected"
620 " three): %s", str(valarr))
624 def _GatherBlockDevs(instance):
625 """Set up an instance's block device(s).
627 This is run on the primary node at instance startup. The block
628 devices must be already assembled.
632 for disk in instance.disks:
633 device = _RecursiveFindBD(disk)
635 raise errors.BlockDeviceError("Block device '%s' is not set up." %
638 block_devices.append((disk, device))
642 def StartInstance(instance, extra_args):
643 """Start an instance.
645 @type instance: instance object
646 @param instance: the instance object
648 @return: whether the startup was successful or not
651 running_instances = GetInstanceList([instance.hypervisor])
653 if instance.name in running_instances:
656 block_devices = _GatherBlockDevs(instance)
657 hyper = hypervisor.GetHypervisor(instance.hypervisor)
660 hyper.StartInstance(instance, block_devices, extra_args)
661 except errors.HypervisorError, err:
662 logging.exception("Failed to start instance")
668 def ShutdownInstance(instance):
669 """Shut an instance down.
671 @type instance: instance object
672 @param instance: the instance object
674 @return: whether the startup was successful or not
677 hv_name = instance.hypervisor
678 running_instances = GetInstanceList([hv_name])
680 if instance.name not in running_instances:
683 hyper = hypervisor.GetHypervisor(hv_name)
685 hyper.StopInstance(instance)
686 except errors.HypervisorError, err:
687 logging.error("Failed to stop instance")
690 # test every 10secs for 2min
694 for dummy in range(11):
695 if instance.name not in GetInstanceList([hv_name]):
699 # the shutdown did not succeed
700 logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
703 hyper.StopInstance(instance, force=True)
704 except errors.HypervisorError, err:
705 logging.exception("Failed to stop instance")
709 if instance.name in GetInstanceList([hv_name]):
710 logging.error("could not shutdown instance '%s' even by destroy",
717 def RebootInstance(instance, reboot_type, extra_args):
718 """Reboot an instance.
721 instance - name of instance to reboot
722 reboot_type - how to reboot [soft,hard,full]
725 running_instances = GetInstanceList([instance.hypervisor])
727 if instance.name not in running_instances:
728 logging.error("Cannot reboot instance that is not running")
731 hyper = hypervisor.GetHypervisor(instance.hypervisor)
732 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
734 hyper.RebootInstance(instance)
735 except errors.HypervisorError, err:
736 logging.exception("Failed to soft reboot instance")
738 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
740 ShutdownInstance(instance)
741 StartInstance(instance, extra_args)
742 except errors.HypervisorError, err:
743 logging.exception("Failed to hard reboot instance")
746 raise errors.ParameterError("reboot_type invalid")
751 def MigrateInstance(instance, target, live):
752 """Migrates an instance to another node.
754 @type instance: L{objects.Instance}
755 @param instance: the instance definition
757 @param target: the target node name
759 @param live: whether the migration should be done live or not (the
760 interpretation of this parameter is left to the hypervisor)
762 @return: a tuple of (success, msg) where:
763 - succes is a boolean denoting the success/failure of the operation
764 - msg is a string with details in case of failure
767 hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
770 hyper.MigrateInstance(instance.name, target, live)
771 except errors.HypervisorError, err:
772 msg = "Failed to migrate instance: %s" % str(err)
775 return (True, "Migration successfull")
778 def CreateBlockDevice(disk, size, owner, on_primary, info):
779 """Creates a block device for an instance.
781 @type disk: L{objects.Disk}
782 @param disk: the object describing the disk we should create
784 @param size: the size of the physical underlying device, in MiB
786 @param owner: the name of the instance for which disk is created,
787 used for device cache data
788 @type on_primary: boolean
789 @param on_primary: indicates if it is the primary node or not
791 @param info: string that will be sent to the physical device
792 creation, used for example to set (LVM) tags on LVs
794 @return: the new unique_id of the device (this can sometime be
795 computed only after creation), or None. On secondary nodes,
796 it's not required to return anything.
801 for child in disk.children:
802 crdev = _RecursiveAssembleBD(child, owner, on_primary)
803 if on_primary or disk.AssembleOnSecondary():
804 # we need the children open in case the device itself has to
809 device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
810 if device is not None:
811 logging.info("removing existing device %s", disk)
813 except errors.BlockDeviceError, err:
816 device = bdev.Create(disk.dev_type, disk.physical_id,
819 raise ValueError("Can't create child device for %s, %s" %
821 if on_primary or disk.AssembleOnSecondary():
822 if not device.Assemble():
823 errorstring = "Can't assemble device after creation"
824 logging.error(errorstring)
825 raise errors.BlockDeviceError("%s, very unusual event - check the node"
826 " daemon logs" % errorstring)
827 device.SetSyncSpeed(constants.SYNC_SPEED)
828 if on_primary or disk.OpenOnSecondary():
829 device.Open(force=True)
830 DevCacheManager.UpdateCache(device.dev_path, owner,
831 on_primary, disk.iv_name)
835 physical_id = device.unique_id
839 def RemoveBlockDevice(disk):
840 """Remove a block device.
842 This is intended to be called recursively.
846 # since we are removing the device, allow a partial match
847 # this allows removal of broken mirrors
848 rdev = _RecursiveFindBD(disk, allow_partial=True)
849 except errors.BlockDeviceError, err:
850 # probably can't attach
851 logging.info("Can't attach to device %s in remove", disk)
854 r_path = rdev.dev_path
855 result = rdev.Remove()
857 DevCacheManager.RemoveCache(r_path)
861 for child in disk.children:
862 result = result and RemoveBlockDevice(child)
866 def _RecursiveAssembleBD(disk, owner, as_primary):
867 """Activate a block device for an instance.
869 This is run on the primary and secondary nodes for an instance.
871 This function is called recursively.
874 disk: a objects.Disk object
875 as_primary: if we should make the block device read/write
878 the assembled device or None (in case no device was assembled)
880 If the assembly is not successful, an exception is raised.
885 mcn = disk.ChildrenNeeded()
887 mcn = 0 # max number of Nones allowed
889 mcn = len(disk.children) - mcn # max number of Nones
890 for chld_disk in disk.children:
892 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
893 except errors.BlockDeviceError, err:
894 if children.count(None) >= mcn:
897 logging.debug("Error in child activation: %s", str(err))
898 children.append(cdev)
900 if as_primary or disk.AssembleOnSecondary():
901 r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
902 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
904 if as_primary or disk.OpenOnSecondary():
906 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
907 as_primary, disk.iv_name)
914 def AssembleBlockDevice(disk, owner, as_primary):
915 """Activate a block device for an instance.
917 This is a wrapper over _RecursiveAssembleBD.
919 @rtype: str or boolean
920 @return: a C{/dev/...} path for primary nodes, and
921 C{True} for secondary nodes
924 result = _RecursiveAssembleBD(disk, owner, as_primary)
925 if isinstance(result, bdev.BlockDev):
926 result = result.dev_path
930 def ShutdownBlockDevice(disk):
931 """Shut down a block device.
933 First, if the device is assembled (can `Attach()`), then the device
934 is shutdown. Then the children of the device are shutdown.
936 This function is called recursively. Note that we don't cache the
937 children or such, as oppossed to assemble, shutdown of different
938 devices doesn't require that the upper device was active.
941 r_dev = _RecursiveFindBD(disk)
942 if r_dev is not None:
943 r_path = r_dev.dev_path
944 result = r_dev.Shutdown()
946 DevCacheManager.RemoveCache(r_path)
950 for child in disk.children:
951 result = result and ShutdownBlockDevice(child)
955 def MirrorAddChildren(parent_cdev, new_cdevs):
956 """Extend a mirrored block device.
959 parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
960 if parent_bdev is None:
961 logging.error("Can't find parent device")
963 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
964 if new_bdevs.count(None) > 0:
965 logging.error("Can't find new device(s) to add: %s:%s",
966 new_bdevs, new_cdevs)
968 parent_bdev.AddChildren(new_bdevs)
972 def MirrorRemoveChildren(parent_cdev, new_cdevs):
973 """Shrink a mirrored block device.
976 parent_bdev = _RecursiveFindBD(parent_cdev)
977 if parent_bdev is None:
978 logging.error("Can't find parent in remove children: %s", parent_cdev)
981 for disk in new_cdevs:
982 rpath = disk.StaticDevPath()
984 bd = _RecursiveFindBD(disk)
986 logging.error("Can't find dynamic device %s while removing children",
990 devs.append(bd.dev_path)
993 parent_bdev.RemoveChildren(devs)
997 def GetMirrorStatus(disks):
998 """Get the mirroring status of a list of devices.
1001 disks: list of `objects.Disk`
1004 list of (mirror_done, estimated_time) tuples, which
1005 are the result of bdev.BlockDevice.CombinedSyncStatus()
1010 rbd = _RecursiveFindBD(dsk)
1012 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1013 stats.append(rbd.CombinedSyncStatus())
1017 def _RecursiveFindBD(disk, allow_partial=False):
1018 """Check if a device is activated.
1020 If so, return informations about the real device.
1023 disk: the objects.Disk instance
1024 allow_partial: don't abort the find if a child of the
1025 device can't be found; this is intended to be
1026 used when repairing mirrors
1029 None if the device can't be found
1030 otherwise the device instance
1035 for chdisk in disk.children:
1036 children.append(_RecursiveFindBD(chdisk))
1038 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1041 def FindBlockDevice(disk):
1042 """Check if a device is activated.
1044 If so, return informations about the real device.
1047 disk: the objects.Disk instance
1049 None if the device can't be found
1050 (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1053 rbd = _RecursiveFindBD(disk)
1056 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1059 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1060 """Write a file to the filesystem.
1062 This allows the master to overwrite(!) a file. It will only perform
1063 the operation if the file belongs to a list of configuration files.
1066 if not os.path.isabs(file_name):
1067 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1072 constants.CLUSTER_CONF_FILE,
1073 constants.ETC_HOSTS,
1074 constants.SSH_KNOWN_HOSTS_FILE,
1075 constants.VNC_PASSWORD_FILE,
1078 if file_name not in allowed_files:
1079 logging.error("Filename passed to UploadFile not in allowed"
1080 " upload targets: '%s'", file_name)
1083 utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1084 atime=atime, mtime=mtime)
1088 def _ErrnoOrStr(err):
1089 """Format an EnvironmentError exception.
1091 If the `err` argument has an errno attribute, it will be looked up
1092 and converted into a textual EXXXX description. Otherwise the string
1093 representation of the error will be returned.
1096 if hasattr(err, 'errno'):
1097 detail = errno.errorcode[err.errno]
1103 def _OSOndiskVersion(name, os_dir):
1104 """Compute and return the API version of a given OS.
1106 This function will try to read the API version of the os given by
1107 the 'name' parameter and residing in the 'os_dir' directory.
1109 Return value will be either an integer denoting the version or None in the
1110 case when this is not a valid OS name.
1113 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1116 st = os.stat(api_file)
1117 except EnvironmentError, err:
1118 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1119 " found (%s)" % _ErrnoOrStr(err))
1121 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1122 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1128 api_versions = f.readlines()
1131 except EnvironmentError, err:
1132 raise errors.InvalidOS(name, os_dir, "error while reading the"
1133 " API version (%s)" % _ErrnoOrStr(err))
1135 api_versions = [version.strip() for version in api_versions]
1137 api_versions = [int(version) for version in api_versions]
1138 except (TypeError, ValueError), err:
1139 raise errors.InvalidOS(name, os_dir,
1140 "API version is not integer (%s)" % str(err))
1145 def DiagnoseOS(top_dirs=None):
1146 """Compute the validity for all OSes.
1148 Returns an OS object for each name in all the given top directories
1149 (if not given defaults to constants.OS_SEARCH_PATH)
1155 if top_dirs is None:
1156 top_dirs = constants.OS_SEARCH_PATH
1159 for dir_name in top_dirs:
1160 if os.path.isdir(dir_name):
1162 f_names = utils.ListVisibleFiles(dir_name)
1163 except EnvironmentError, err:
1164 logging.exception("Can't list the OS directory %s", dir_name)
1166 for name in f_names:
1168 os_inst = OSFromDisk(name, base_dir=dir_name)
1169 result.append(os_inst)
1170 except errors.InvalidOS, err:
1171 result.append(objects.OS.FromInvalidOS(err))
1176 def OSFromDisk(name, base_dir=None):
1177 """Create an OS instance from disk.
1179 This function will return an OS instance if the given name is a
1180 valid OS name. Otherwise, it will raise an appropriate
1181 `errors.InvalidOS` exception, detailing why this is not a valid
1184 @type base_dir: string
1185 @keyword base_dir: Base directory containing OS installations.
1186 Defaults to a search in all the OS_SEARCH_PATH dirs.
1190 if base_dir is None:
1191 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1193 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1195 os_dir = os.path.sep.join([base_dir, name])
1197 api_versions = _OSOndiskVersion(name, os_dir)
1199 if constants.OS_API_VERSION not in api_versions:
1200 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1201 " (found %s want %s)"
1202 % (api_versions, constants.OS_API_VERSION))
1204 # OS Scripts dictionary, we will populate it with the actual script names
1205 os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1207 for script in os_scripts:
1208 os_scripts[script] = os.path.sep.join([os_dir, script])
1211 st = os.stat(os_scripts[script])
1212 except EnvironmentError, err:
1213 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1214 (script, _ErrnoOrStr(err)))
1216 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1217 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1220 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1221 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1225 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1226 create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1227 export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1228 import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1229 rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1230 api_versions=api_versions)
1232 def OSEnvironment(instance, debug=0):
1233 """Calculate the environment for an os script.
1235 @type instance: instance object
1236 @param instance: target instance for the os script run
1237 @type debug: integer
1238 @param debug: debug level (0 or 1, for os api 10)
1240 @return: dict of environment variables
1244 result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1245 result['INSTANCE_NAME'] = instance.name
1246 result['HYPERVISOR'] = instance.hypervisor
1247 result['DISK_COUNT'] = '%d' % len(instance.disks)
1248 result['NIC_COUNT'] = '%d' % len(instance.nics)
1249 result['DEBUG_LEVEL'] = '%d' % debug
1250 for idx, disk in enumerate(instance.disks):
1251 real_disk = _RecursiveFindBD(disk)
1252 if real_disk is None:
1253 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1256 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1257 # FIXME: When disks will have read-only mode, populate this
1258 result['DISK_%d_ACCESS' % idx] = 'W'
1259 if constants.HV_DISK_TYPE in instance.hvparams:
1260 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1261 instance.hvparams[constants.HV_DISK_TYPE]
1262 if disk.dev_type in constants.LDS_BLOCK:
1263 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1264 elif disk.dev_type == constants.LD_FILE:
1265 result['DISK_%d_BACKEND_TYPE' % idx] = \
1266 'file:%s' % disk.physical_id[0]
1267 for idx, nic in enumerate(instance.nics):
1268 result['NIC_%d_MAC' % idx] = nic.mac
1270 result['NIC_%d_IP' % idx] = nic.ip
1271 result['NIC_%d_BRIDGE' % idx] = nic.bridge
1272 if constants.HV_NIC_TYPE in instance.hvparams:
1273 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1274 instance.hvparams[constants.HV_NIC_TYPE]
1278 def GrowBlockDevice(disk, amount):
1279 """Grow a stack of block devices.
1281 This function is called recursively, with the childrens being the
1285 disk: the disk to be grown
1287 Returns: a tuple of (status, result), with:
1288 status: the result (true/false) of the operation
1289 result: the error message if the operation failed, otherwise not used
1292 r_dev = _RecursiveFindBD(disk)
1294 return False, "Cannot find block device %s" % (disk,)
1298 except errors.BlockDeviceError, err:
1299 return False, str(err)
1304 def SnapshotBlockDevice(disk):
1305 """Create a snapshot copy of a block device.
1307 This function is called recursively, and the snapshot is actually created
1308 just for the leaf lvm backend device.
1310 @type disk: L{objects.Disk}
1311 @param disk: the disk to be snapshotted
1313 @return: snapshot disk path
1317 if len(disk.children) == 1:
1318 # only one child, let's recurse on it
1319 return SnapshotBlockDevice(disk.children[0])
1321 # more than one child, choose one that matches
1322 for child in disk.children:
1323 if child.size == disk.size:
1324 # return implies breaking the loop
1325 return SnapshotBlockDevice(child)
1326 elif disk.dev_type == constants.LD_LV:
1327 r_dev = _RecursiveFindBD(disk)
1328 if r_dev is not None:
1329 # let's stay on the safe side and ask for the full size, for now
1330 return r_dev.Snapshot(disk.size)
1334 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1335 " '%s' of type '%s'" %
1336 (disk.unique_id, disk.dev_type))
1339 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1340 """Export a block device snapshot to a remote node.
1342 @type disk: L{objects.Disk}
1343 @param disk: the description of the disk to export
1344 @type dest_node: str
1345 @param dest_node: the destination node to export to
1346 @type instance: L{objects.Instance}
1347 @param instance: the instance object to whom the disk belongs
1348 @type cluster_name: str
1349 @param cluster_name: the cluster name, needed for SSH hostalias
1351 @param idx: the index of the disk in the instance's disk list,
1352 used to export to the OS scripts environment
1354 @return: the success of the operation
1357 export_env = OSEnvironment(instance)
1359 inst_os = OSFromDisk(instance.os)
1360 export_script = inst_os.export_script
1362 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1363 instance.name, int(time.time()))
1364 if not os.path.exists(constants.LOG_OS_DIR):
1365 os.mkdir(constants.LOG_OS_DIR, 0750)
1366 real_disk = _RecursiveFindBD(disk)
1367 if real_disk is None:
1368 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1372 export_env['EXPORT_DEVICE'] = real_disk.dev_path
1373 export_env['EXPORT_INDEX'] = str(idx)
1375 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1376 destfile = disk.physical_id[1]
1378 # the target command is built out of three individual commands,
1379 # which are joined by pipes; we check each individual command for
1381 expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1382 export_script, logfile)
1386 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1387 destdir, destdir, destfile)
1388 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1389 constants.GANETI_RUNAS,
1392 # all commands have been checked, so we're safe to combine them
1393 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1395 result = utils.RunCmd(command, env=export_env)
1398 logging.error("os snapshot export command '%s' returned error: %s"
1399 " output: %s", command, result.fail_reason, result.output)
1405 def FinalizeExport(instance, snap_disks):
1406 """Write out the export configuration information.
1409 instance: instance configuration
1410 snap_disks: snapshot block devices
1413 False in case of error, True otherwise.
1416 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1417 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1419 config = objects.SerializableConfigParser()
1421 config.add_section(constants.INISECT_EXP)
1422 config.set(constants.INISECT_EXP, 'version', '0')
1423 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1424 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1425 config.set(constants.INISECT_EXP, 'os', instance.os)
1426 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1428 config.add_section(constants.INISECT_INS)
1429 config.set(constants.INISECT_INS, 'name', instance.name)
1430 config.set(constants.INISECT_INS, 'memory', '%d' %
1431 instance.beparams[constants.BE_MEMORY])
1432 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1433 instance.beparams[constants.BE_VCPUS])
1434 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1437 for nic_count, nic in enumerate(instance.nics):
1438 config.set(constants.INISECT_INS, 'nic%d_mac' %
1439 nic_count, '%s' % nic.mac)
1440 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1441 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1443 # TODO: redundant: on load can read nics until it doesn't exist
1444 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1447 for disk_count, disk in enumerate(snap_disks):
1449 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1450 ('%s' % disk.iv_name))
1451 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1452 ('%s' % disk.physical_id[1]))
1453 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1455 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1457 cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1458 cfo = open(cff, 'w')
1464 shutil.rmtree(finaldestdir, True)
1465 shutil.move(destdir, finaldestdir)
1470 def ExportInfo(dest):
1471 """Get export configuration information.
1474 dest: directory containing the export
1477 A serializable config file containing the export info.
1480 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1482 config = objects.SerializableConfigParser()
1485 if (not config.has_section(constants.INISECT_EXP) or
1486 not config.has_section(constants.INISECT_INS)):
1492 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1493 """Import an os image into an instance.
1495 @type instance: L{objects.Instance}
1496 @param instance: instance to import the disks into
1497 @type src_node: string
1498 @param src_node: source node for the disk images
1499 @type src_images: list of string
1500 @param src_images: absolute paths of the disk images
1501 @rtype: list of boolean
1502 @return: each boolean represent the success of importing the n-th disk
1505 import_env = OSEnvironment(instance)
1506 inst_os = OSFromDisk(instance.os)
1507 import_script = inst_os.import_script
1509 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1510 instance.name, int(time.time()))
1511 if not os.path.exists(constants.LOG_OS_DIR):
1512 os.mkdir(constants.LOG_OS_DIR, 0750)
1515 impcmd = utils.BuildShellCmd("(cd %s; %s &>%s)", inst_os.path, import_script,
1519 for idx, image in enumerate(src_images):
1521 destcmd = utils.BuildShellCmd('cat %s', image)
1522 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1523 constants.GANETI_RUNAS,
1525 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1526 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1527 import_env['IMPORT_INDEX'] = str(idx)
1528 result = utils.RunCmd(command, env=import_env)
1530 logging.error("disk import command '%s' returned error: %s"
1531 " output: %s", command, result.fail_reason, result.output)
1532 final_result.append(False)
1534 final_result.append(True)
1536 final_result.append(True)
1542 """Return a list of exports currently available on this machine.
1545 if os.path.isdir(constants.EXPORT_DIR):
1546 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1551 def RemoveExport(export):
1552 """Remove an existing export from the node.
1555 export: the name of the export to remove
1558 False in case of error, True otherwise.
1561 target = os.path.join(constants.EXPORT_DIR, export)
1563 shutil.rmtree(target)
1564 # TODO: catch some of the relevant exceptions and provide a pretty
1565 # error message if rmtree fails.
1570 def RenameBlockDevices(devlist):
1571 """Rename a list of block devices.
1573 The devlist argument is a list of tuples (disk, new_logical,
1574 new_physical). The return value will be a combined boolean result
1575 (True only if all renames succeeded).
1579 for disk, unique_id in devlist:
1580 dev = _RecursiveFindBD(disk)
1585 old_rpath = dev.dev_path
1586 dev.Rename(unique_id)
1587 new_rpath = dev.dev_path
1588 if old_rpath != new_rpath:
1589 DevCacheManager.RemoveCache(old_rpath)
1590 # FIXME: we should add the new cache information here, like:
1591 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1592 # but we don't have the owner here - maybe parse from existing
1593 # cache? for now, we only lose lvm data when we rename, which
1594 # is less critical than DRBD or MD
1595 except errors.BlockDeviceError, err:
1596 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1601 def _TransformFileStorageDir(file_storage_dir):
1602 """Checks whether given file_storage_dir is valid.
1604 Checks wheter the given file_storage_dir is within the cluster-wide
1605 default file_storage_dir stored in SimpleStore. Only paths under that
1606 directory are allowed.
1608 @type file_storage_dir: str
1609 @param file_storage_dir: the path to check
1611 @return: the normalized path if valid, None otherwise
1615 file_storage_dir = os.path.normpath(file_storage_dir)
1616 base_file_storage_dir = cfg.GetFileStorageDir()
1617 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1618 base_file_storage_dir):
1619 logging.error("file storage directory '%s' is not under base file"
1620 " storage directory '%s'",
1621 file_storage_dir, base_file_storage_dir)
1623 return file_storage_dir
1626 def CreateFileStorageDir(file_storage_dir):
1627 """Create file storage directory.
1629 @type file_storage_dir: str
1630 @param file_storage_dir: directory to create
1633 @return: tuple with first element a boolean indicating wheter dir
1634 creation was successful or not
1637 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1639 if not file_storage_dir:
1642 if os.path.exists(file_storage_dir):
1643 if not os.path.isdir(file_storage_dir):
1644 logging.error("'%s' is not a directory", file_storage_dir)
1648 os.makedirs(file_storage_dir, 0750)
1649 except OSError, err:
1650 logging.error("Cannot create file storage directory '%s': %s",
1651 file_storage_dir, err)
1656 def RemoveFileStorageDir(file_storage_dir):
1657 """Remove file storage directory.
1659 Remove it only if it's empty. If not log an error and return.
1662 file_storage_dir: string containing the path
1665 tuple with first element a boolean indicating wheter dir
1666 removal was successful or not
1669 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1671 if not file_storage_dir:
1674 if os.path.exists(file_storage_dir):
1675 if not os.path.isdir(file_storage_dir):
1676 logging.error("'%s' is not a directory", file_storage_dir)
1678 # deletes dir only if empty, otherwise we want to return False
1680 os.rmdir(file_storage_dir)
1681 except OSError, err:
1682 logging.exception("Cannot remove file storage directory '%s'",
1688 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1689 """Rename the file storage directory.
1692 old_file_storage_dir: string containing the old path
1693 new_file_storage_dir: string containing the new path
1696 tuple with first element a boolean indicating wheter dir
1697 rename was successful or not
1700 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1701 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1703 if not old_file_storage_dir or not new_file_storage_dir:
1706 if not os.path.exists(new_file_storage_dir):
1707 if os.path.isdir(old_file_storage_dir):
1709 os.rename(old_file_storage_dir, new_file_storage_dir)
1710 except OSError, err:
1711 logging.exception("Cannot rename '%s' to '%s'",
1712 old_file_storage_dir, new_file_storage_dir)
1715 logging.error("'%s' is not a directory", old_file_storage_dir)
1718 if os.path.exists(old_file_storage_dir):
1719 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1720 old_file_storage_dir, new_file_storage_dir)
1725 def _IsJobQueueFile(file_name):
1726 """Checks whether the given filename is in the queue directory.
1729 queue_dir = os.path.normpath(constants.QUEUE_DIR)
1730 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1733 logging.error("'%s' is not a file in the queue directory",
1739 def JobQueueUpdate(file_name, content):
1740 """Updates a file in the queue directory.
1743 if not _IsJobQueueFile(file_name):
1746 # Write and replace the file atomically
1747 utils.WriteFile(file_name, data=content)
1752 def JobQueueRename(old, new):
1753 """Renames a job queue file.
1756 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1764 def JobQueueSetDrainFlag(drain_flag):
1765 """Set the drain flag for the queue.
1767 This will set or unset the queue drain flag.
1769 @type drain_flag: bool
1770 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1774 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1776 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1781 def CloseBlockDevices(disks):
1782 """Closes the given block devices.
1784 This means they will be switched to secondary mode (in case of DRBD).
1789 rd = _RecursiveFindBD(cf)
1791 return (False, "Can't find device %s" % cf)
1798 except errors.BlockDeviceError, err:
1799 msg.append(str(err))
1801 return (False, "Can't make devices secondary: %s" % ",".join(msg))
1803 return (True, "All devices secondary")
1806 def ValidateHVParams(hvname, hvparams):
1807 """Validates the given hypervisor parameters.
1809 @type hvname: string
1810 @param hvname: the hypervisor name
1811 @type hvparams: dict
1812 @param hvparams: the hypervisor parameters to be validated
1813 @rtype: tuple (bool, str)
1814 @return: tuple of (success, message)
1818 hv_type = hypervisor.GetHypervisor(hvname)
1819 hv_type.ValidateParameters(hvparams)
1820 return (True, "Validation passed")
1821 except errors.HypervisorError, err:
1822 return (False, str(err))
1825 class HooksRunner(object):
1828 This class is instantiated on the node side (ganeti-noded) and not on
1832 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1834 def __init__(self, hooks_base_dir=None):
1835 """Constructor for hooks runner.
1838 - hooks_base_dir: if not None, this overrides the
1839 constants.HOOKS_BASE_DIR (useful for unittests)
1842 if hooks_base_dir is None:
1843 hooks_base_dir = constants.HOOKS_BASE_DIR
1844 self._BASE_DIR = hooks_base_dir
1847 def ExecHook(script, env):
1848 """Exec one hook script.
1851 - script: the full path to the script
1852 - env: the environment with which to exec the script
1855 # exec the process using subprocess and log the output
1858 fdstdin = open("/dev/null", "r")
1859 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1860 stderr=subprocess.STDOUT, close_fds=True,
1861 shell=False, cwd="/", env=env)
1864 output = child.stdout.read(4096)
1865 child.stdout.close()
1866 except EnvironmentError, err:
1867 output += "Hook script error: %s" % str(err)
1871 result = child.wait()
1873 except EnvironmentError, err:
1874 if err.errno == errno.EINTR:
1878 # try not to leak fds
1879 for fd in (fdstdin, ):
1883 except EnvironmentError, err:
1884 # just log the error
1885 #logging.exception("Error while closing fd %s", fd)
1888 return result == 0, output
1890 def RunHooks(self, hpath, phase, env):
1891 """Run the scripts in the hooks directory.
1893 This method will not be usually overriden by child opcodes.
1896 if phase == constants.HOOKS_PHASE_PRE:
1898 elif phase == constants.HOOKS_PHASE_POST:
1901 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1904 subdir = "%s-%s.d" % (hpath, suffix)
1905 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1907 dir_contents = utils.ListVisibleFiles(dir_name)
1908 except OSError, err:
1912 # we use the standard python sort order,
1913 # so 00name is the recommended naming scheme
1915 for relname in dir_contents:
1916 fname = os.path.join(dir_name, relname)
1917 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1918 self.RE_MASK.match(relname) is not None):
1919 rrval = constants.HKR_SKIP
1922 result, output = self.ExecHook(fname, env)
1924 rrval = constants.HKR_FAIL
1926 rrval = constants.HKR_SUCCESS
1927 rr.append(("%s/%s" % (subdir, relname), rrval, output))
1932 class IAllocatorRunner(object):
1933 """IAllocator runner.
1935 This class is instantiated on the node side (ganeti-noded) and not on
1939 def Run(self, name, idata):
1940 """Run an iallocator script.
1942 Return value: tuple of:
1943 - run status (one of the IARUN_ constants)
1946 - fail reason (as from utils.RunResult)
1949 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1951 if alloc_script is None:
1952 return (constants.IARUN_NOTFOUND, None, None, None)
1954 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1958 result = utils.RunCmd([alloc_script, fin_name])
1960 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1965 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1968 class DevCacheManager(object):
1969 """Simple class for managing a cache of block device information.
1972 _DEV_PREFIX = "/dev/"
1973 _ROOT_DIR = constants.BDEV_CACHE_DIR
1976 def _ConvertPath(cls, dev_path):
1977 """Converts a /dev/name path to the cache file name.
1979 This replaces slashes with underscores and strips the /dev
1980 prefix. It then returns the full path to the cache file
1983 if dev_path.startswith(cls._DEV_PREFIX):
1984 dev_path = dev_path[len(cls._DEV_PREFIX):]
1985 dev_path = dev_path.replace("/", "_")
1986 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1990 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1991 """Updates the cache information for a given device.
1994 if dev_path is None:
1995 logging.error("DevCacheManager.UpdateCache got a None dev_path")
1997 fpath = cls._ConvertPath(dev_path)
2003 iv_name = "not_visible"
2004 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2006 utils.WriteFile(fpath, data=fdata)
2007 except EnvironmentError, err:
2008 logging.exception("Can't update bdev cache for %s", dev_path)
2011 def RemoveCache(cls, dev_path):
2012 """Remove data for a dev_path.
2015 if dev_path is None:
2016 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2018 fpath = cls._ConvertPath(dev_path)
2020 utils.RemoveFile(fpath)
2021 except EnvironmentError, err:
2022 logging.exception("Can't update bdev cache for %s", dev_path)