4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
48 return ssconf.SimpleConfigReader()
51 def _GetSshRunner(cluster_name):
52 return ssh.SshRunner(cluster_name)
55 def _CleanDirectory(path, exclude=[]):
56 """Removes all regular files in a directory.
58 @param exclude: List of files to be excluded.
62 if not os.path.isdir(path):
65 # Normalize excluded paths
66 exclude = [os.path.normpath(i) for i in exclude]
68 for rel_name in utils.ListVisibleFiles(path):
69 full_name = os.path.normpath(os.path.join(path, rel_name))
70 if full_name in exclude:
72 if os.path.isfile(full_name) and not os.path.islink(full_name):
73 utils.RemoveFile(full_name)
77 """Removes job queue files and archived jobs
80 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
85 """Returns master information.
87 This is an utility function to compute master information, either
88 for consumption here or from the node daemon.
91 @return: (master_netdev, master_ip, master_name)
96 master_netdev = cfg.GetMasterNetdev()
97 master_ip = cfg.GetMasterIP()
98 master_node = cfg.GetMasterNode()
99 except errors.ConfigurationError, err:
100 logging.exception("Cluster configuration incomplete")
102 return (master_netdev, master_ip, master_node)
105 def StartMaster(start_daemons):
106 """Activate local node as master node.
108 The function will always try activate the IP address of the master
109 (if someone else has it, then it won't). Then, if the start_daemons
110 parameter is True, it will also start the master daemons
111 (ganet-masterd and ganeti-rapi).
115 master_netdev, master_ip, _ = GetMasterInfo()
116 if not master_netdev:
119 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120 if utils.OwnIpAddress(master_ip):
121 # we already have the ip:
122 logging.debug("Already started")
124 logging.error("Someone else has the master ip, not activating")
127 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128 "dev", master_netdev, "label",
129 "%s:0" % master_netdev])
131 logging.error("Can't activate master IP: %s", result.output)
134 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135 "-s", master_ip, master_ip])
136 # we'll ignore the exit code of arping
138 # and now start the master and rapi daemons
140 for daemon in 'ganeti-masterd', 'ganeti-rapi':
141 result = utils.RunCmd([daemon])
143 logging.error("Can't start daemon %s: %s", daemon, result.output)
148 def StopMaster(stop_daemons):
149 """Deactivate this node as master.
151 The function will always try to deactivate the IP address of the
152 master. Then, if the stop_daemons parameter is True, it will also
153 stop the master daemons (ganet-masterd and ganeti-rapi).
156 master_netdev, master_ip, _ = GetMasterInfo()
157 if not master_netdev:
160 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161 "dev", master_netdev])
163 logging.error("Can't remove the master IP, error: %s", result.output)
164 # but otherwise ignore the failure
167 # stop/kill the rapi and the master daemon
168 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175 """Joins this node to the cluster.
177 This does the following:
178 - updates the hostkeys of the machine (rsa and dsa)
179 - adds the ssh private key to the user
180 - adds the ssh public key to the users' authorized_keys file
183 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187 for name, content, mode in sshd_keys:
188 utils.WriteFile(name, data=content, mode=mode)
191 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193 except errors.OpExecError, err:
194 logging.exception("Error while processing user ssh files")
197 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198 utils.WriteFile(name, data=content, mode=0600)
200 utils.AddAuthorizedKey(auth_keys, sshpub)
202 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
208 """Cleans up the current node and prepares it to be removed from the cluster.
211 _CleanDirectory(constants.DATA_DIR)
215 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216 except errors.OpExecError:
217 logging.exception("Error while processing ssh files")
220 f = open(pub_key, 'r')
222 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
226 utils.RemoveFile(priv_key)
227 utils.RemoveFile(pub_key)
229 # Return a reassuring string to the caller, and quit
230 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
233 def GetNodeInfo(vgname, hypervisor_type):
234 """Gives back a hash with different informations about the node.
236 @type vgname: C{string}
237 @param vgname: the name of the volume group to ask for disk space information
238 @type hypervisor_type: C{str}
239 @param hypervisor_type: the name of the hypervisor to ask for
242 @return: dictionary with the following keys:
243 - vg_size is the size of the configured volume group in MiB
244 - vg_free is the free size of the volume group in MiB
245 - memory_dom0 is the memory allocated for domain0 in MiB
246 - memory_free is the currently available (free) ram in MiB
247 - memory_total is the total number of ram in MiB
251 vginfo = _GetVGInfo(vgname)
252 outputarray['vg_size'] = vginfo['vg_size']
253 outputarray['vg_free'] = vginfo['vg_free']
255 hyper = hypervisor.GetHypervisor(hypervisor_type)
256 hyp_info = hyper.GetNodeInfo()
257 if hyp_info is not None:
258 outputarray.update(hyp_info)
260 f = open("/proc/sys/kernel/random/boot_id", 'r')
262 outputarray["bootid"] = f.read(128).rstrip("\n")
269 def VerifyNode(what, cluster_name):
270 """Verify the status of the local node.
272 Based on the input L{what} parameter, various checks are done on the
275 If the I{filelist} key is present, this list of
276 files is checksummed and the file/checksum pairs are returned.
278 If the I{nodelist} key is present, we check that we have
279 connectivity via ssh with the target nodes (and check the hostname
282 If the I{node-net-test} key is present, we check that we have
283 connectivity to the given nodes via both primary IP and, if
284 applicable, secondary IPs.
287 @param what: a dictionary of things to check:
288 - filelist: list of files for which to compute checksums
289 - nodelist: list of nodes we should check ssh communication with
290 - node-net-test: list of nodes we should check node daemon port
292 - hypervisor: list with hypervisors to run the verify for
297 if 'hypervisor' in what:
298 result['hypervisor'] = my_dict = {}
299 for hv_name in what['hypervisor']:
300 my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
302 if 'filelist' in what:
303 result['filelist'] = utils.FingerprintFiles(what['filelist'])
305 if 'nodelist' in what:
306 result['nodelist'] = {}
307 random.shuffle(what['nodelist'])
308 for node in what['nodelist']:
309 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
311 result['nodelist'][node] = message
312 if 'node-net-test' in what:
313 result['node-net-test'] = {}
314 my_name = utils.HostInfo().name
315 my_pip = my_sip = None
316 for name, pip, sip in what['node-net-test']:
322 result['node-net-test'][my_name] = ("Can't find my own"
323 " primary/secondary IP"
326 port = utils.GetNodeDaemonPort()
327 for name, pip, sip in what['node-net-test']:
329 if not utils.TcpPing(pip, port, source=my_pip):
330 fail.append("primary")
332 if not utils.TcpPing(sip, port, source=my_sip):
333 fail.append("secondary")
335 result['node-net-test'][name] = ("failure using the %s"
342 def GetVolumeList(vg_name):
343 """Compute list of logical volumes and their size.
346 dictionary of all partions (key) with their size (in MiB), inactive
348 {'test1': ('20.06', True, True)}
353 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354 "--separator=%s" % sep,
355 "-olv_name,lv_size,lv_attr", vg_name])
357 logging.error("Failed to list logical volumes, lvs output: %s",
361 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362 for line in result.stdout.splitlines():
364 match = valid_line_re.match(line)
366 logging.error("Invalid line returned from lvs output: '%s'", line)
368 name, size, attr = match.groups()
369 inactive = attr[4] == '-'
370 online = attr[5] == 'o'
371 lvs[name] = (size, inactive, online)
376 def ListVolumeGroups():
377 """List the volume groups and their size.
380 Dictionary with keys volume name and values the size of the volume
383 return utils.ListVolumeGroups()
387 """List all volumes on this node.
390 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
392 "--options=lv_name,lv_size,devices,vg_name"])
394 logging.error("Failed to list logical volumes, lvs output: %s",
400 return dev.split('(')[0]
406 'name': line[0].strip(),
407 'size': line[1].strip(),
408 'dev': parse_dev(line[2].strip()),
409 'vg': line[3].strip(),
412 return [map_line(line.split('|')) for line in result.stdout.splitlines()
413 if line.count('|') >= 3]
416 def BridgesExist(bridges_list):
417 """Check if a list of bridges exist on the current node.
420 True if all of them exist, false otherwise
423 for bridge in bridges_list:
424 if not utils.BridgeExists(bridge):
430 def GetInstanceList(hypervisor_list):
431 """Provides a list of instances.
433 @type hypervisor_list: list
434 @param hypervisor_list: the list of hypervisors to query information
437 @return: a list of all running instances on the current node
438 - instance1.example.com
439 - instance2.example.com
443 for hname in hypervisor_list:
445 names = hypervisor.GetHypervisor(hname).ListInstances()
446 results.extend(names)
447 except errors.HypervisorError, err:
448 logging.exception("Error enumerating instances for hypevisor %s", hname)
449 # FIXME: should we somehow not propagate this to the master?
455 def GetInstanceInfo(instance, hname):
456 """Gives back the informations about an instance as a dictionary.
458 @type instance: string
459 @param instance: the instance name
461 @param hname: the hypervisor type of the instance
464 @return: dictionary with the following keys:
465 - memory: memory size of instance (int)
466 - state: xen state of instance (string)
467 - time: cpu time of instance (float)
472 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473 if iinfo is not None:
474 output['memory'] = iinfo[2]
475 output['state'] = iinfo[4]
476 output['time'] = iinfo[5]
481 def GetAllInstancesInfo(hypervisor_list):
482 """Gather data about all instances.
484 This is the equivalent of `GetInstanceInfo()`, except that it
485 computes data for all instances at once, thus being faster if one
486 needs data about more than one instance.
488 @type hypervisor_list: list
489 @param hypervisor_list: list of hypervisors to query for instance data
491 @rtype: dict of dicts
492 @return: dictionary of instance: data, with data having the following keys:
493 - memory: memory size of instance (int)
494 - state: xen state of instance (string)
495 - time: cpu time of instance (float)
496 - vcpuus: the number of vcpus
501 for hname in hypervisor_list:
502 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
504 for name, inst_id, memory, vcpus, state, times in iinfo:
506 raise errors.HypervisorError("Instance %s running duplicate" % name)
517 def AddOSToInstance(instance, os_disk, swap_disk):
518 """Add an OS to an instance.
521 instance: the instance object
522 os_disk: the instance-visible name of the os device
523 swap_disk: the instance-visible name of the swap device
526 inst_os = OSFromDisk(instance.os)
528 create_script = inst_os.create_script
530 os_device = instance.FindDisk(os_disk)
531 if os_device is None:
532 logging.error("Can't find this device-visible name '%s'", os_disk)
535 swap_device = instance.FindDisk(swap_disk)
536 if swap_device is None:
537 logging.error("Can't find this device-visible name '%s'", swap_disk)
540 real_os_dev = _RecursiveFindBD(os_device)
541 if real_os_dev is None:
542 raise errors.BlockDeviceError("Block device '%s' is not set up" %
546 real_swap_dev = _RecursiveFindBD(swap_device)
547 if real_swap_dev is None:
548 raise errors.BlockDeviceError("Block device '%s' is not set up" %
552 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
553 instance.name, int(time.time()))
554 if not os.path.exists(constants.LOG_OS_DIR):
555 os.mkdir(constants.LOG_OS_DIR, 0750)
557 command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
558 inst_os.path, create_script, instance.name,
559 real_os_dev.dev_path, real_swap_dev.dev_path,
561 env = {'HYPERVISOR': instance.hypervisor}
563 result = utils.RunCmd(command, env=env)
565 logging.error("os create command '%s' returned error: %s, logfile: %s,"
566 " output: %s", command, result.fail_reason, logfile,
573 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
574 """Run the OS rename script for an instance.
577 instance: the instance object
578 old_name: the old name of the instance
579 os_disk: the instance-visible name of the os device
580 swap_disk: the instance-visible name of the swap device
583 inst_os = OSFromDisk(instance.os)
585 script = inst_os.rename_script
587 os_device = instance.FindDisk(os_disk)
588 if os_device is None:
589 logging.error("Can't find this device-visible name '%s'", os_disk)
592 swap_device = instance.FindDisk(swap_disk)
593 if swap_device is None:
594 logging.error("Can't find this device-visible name '%s'", swap_disk)
597 real_os_dev = _RecursiveFindBD(os_device)
598 if real_os_dev is None:
599 raise errors.BlockDeviceError("Block device '%s' is not set up" %
603 real_swap_dev = _RecursiveFindBD(swap_device)
604 if real_swap_dev is None:
605 raise errors.BlockDeviceError("Block device '%s' is not set up" %
609 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
611 instance.name, int(time.time()))
612 if not os.path.exists(constants.LOG_OS_DIR):
613 os.mkdir(constants.LOG_OS_DIR, 0750)
615 command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
616 inst_os.path, script, old_name, instance.name,
617 real_os_dev.dev_path, real_swap_dev.dev_path,
620 result = utils.RunCmd(command)
623 logging.error("os create command '%s' returned error: %s output: %s",
624 command, result.fail_reason, result.output)
630 def _GetVGInfo(vg_name):
631 """Get informations about the volume group.
634 vg_name: the volume group
637 { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
639 vg_size is the total size of the volume group in MiB
640 vg_free is the free size of the volume group in MiB
641 pv_count are the number of physical disks in that vg
643 If an error occurs during gathering of data, we return the same dict
644 with keys all set to None.
647 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
649 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
650 "--nosuffix", "--units=m", "--separator=:", vg_name])
653 logging.error("volume group %s not present", vg_name)
655 valarr = retval.stdout.strip().rstrip(':').split(':')
659 "vg_size": int(round(float(valarr[0]), 0)),
660 "vg_free": int(round(float(valarr[1]), 0)),
661 "pv_count": int(valarr[2]),
663 except ValueError, err:
664 logging.exception("Fail to parse vgs output")
666 logging.error("vgs output has the wrong number of fields (expected"
667 " three): %s", str(valarr))
671 def _GatherBlockDevs(instance):
672 """Set up an instance's block device(s).
674 This is run on the primary node at instance startup. The block
675 devices must be already assembled.
679 for disk in instance.disks:
680 device = _RecursiveFindBD(disk)
682 raise errors.BlockDeviceError("Block device '%s' is not set up." %
685 block_devices.append((disk, device))
689 def StartInstance(instance, extra_args):
690 """Start an instance.
692 @type instance: instance object
693 @param instance: the instance object
695 @return: whether the startup was successful or not
698 running_instances = GetInstanceList([instance.hypervisor])
700 if instance.name in running_instances:
703 block_devices = _GatherBlockDevs(instance)
704 hyper = hypervisor.GetHypervisor(instance.hypervisor)
707 hyper.StartInstance(instance, block_devices, extra_args)
708 except errors.HypervisorError, err:
709 logging.exception("Failed to start instance")
715 def ShutdownInstance(instance):
716 """Shut an instance down.
718 @type instance: instance object
719 @param instance: the instance object
721 @return: whether the startup was successful or not
724 hv_name = instance.hypervisor
725 running_instances = GetInstanceList([hv_name])
727 if instance.name not in running_instances:
730 hyper = hypervisor.GetHypervisor(hv_name)
732 hyper.StopInstance(instance)
733 except errors.HypervisorError, err:
734 logging.error("Failed to stop instance")
737 # test every 10secs for 2min
741 for dummy in range(11):
742 if instance.name not in GetInstanceList([hv_name]):
746 # the shutdown did not succeed
747 logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
750 hyper.StopInstance(instance, force=True)
751 except errors.HypervisorError, err:
752 logging.exception("Failed to stop instance")
756 if instance.name in GetInstanceList([hv_name]):
757 logging.error("could not shutdown instance '%s' even by destroy",
764 def RebootInstance(instance, reboot_type, extra_args):
765 """Reboot an instance.
768 instance - name of instance to reboot
769 reboot_type - how to reboot [soft,hard,full]
772 running_instances = GetInstanceList([instance.hypervisor])
774 if instance.name not in running_instances:
775 logging.error("Cannot reboot instance that is not running")
778 hyper = hypervisor.GetHypervisor(instance.hypervisor)
779 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
781 hyper.RebootInstance(instance)
782 except errors.HypervisorError, err:
783 logging.exception("Failed to soft reboot instance")
785 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
787 ShutdownInstance(instance)
788 StartInstance(instance, extra_args)
789 except errors.HypervisorError, err:
790 logging.exception("Failed to hard reboot instance")
793 raise errors.ParameterError("reboot_type invalid")
798 def MigrateInstance(instance, target, live):
799 """Migrates an instance to another node.
801 @type instance: C{objects.Instance}
802 @param instance: the instance definition
804 @param target: the target node name
806 @param live: whether the migration should be done live or not (the
807 interpretation of this parameter is left to the hypervisor)
809 @return: a tuple of (success, msg) where:
810 - succes is a boolean denoting the success/failure of the operation
811 - msg is a string with details in case of failure
814 hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
817 hyper.MigrateInstance(instance.name, target, live)
818 except errors.HypervisorError, err:
819 msg = "Failed to migrate instance: %s" % str(err)
822 return (True, "Migration successfull")
825 def CreateBlockDevice(disk, size, owner, on_primary, info):
826 """Creates a block device for an instance.
829 disk: a ganeti.objects.Disk object
830 size: the size of the physical underlying device
831 owner: a string with the name of the instance
832 on_primary: a boolean indicating if it is the primary node or not
833 info: string that will be sent to the physical device creation
836 the new unique_id of the device (this can sometime be
837 computed only after creation), or None. On secondary nodes,
838 it's not required to return anything.
843 for child in disk.children:
844 crdev = _RecursiveAssembleBD(child, owner, on_primary)
845 if on_primary or disk.AssembleOnSecondary():
846 # we need the children open in case the device itself has to
851 device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
852 if device is not None:
853 logging.info("removing existing device %s", disk)
855 except errors.BlockDeviceError, err:
858 device = bdev.Create(disk.dev_type, disk.physical_id,
861 raise ValueError("Can't create child device for %s, %s" %
863 if on_primary or disk.AssembleOnSecondary():
864 if not device.Assemble():
865 errorstring = "Can't assemble device after creation"
866 logging.error(errorstring)
867 raise errors.BlockDeviceError("%s, very unusual event - check the node"
868 " daemon logs" % errorstring)
869 device.SetSyncSpeed(constants.SYNC_SPEED)
870 if on_primary or disk.OpenOnSecondary():
871 device.Open(force=True)
872 DevCacheManager.UpdateCache(device.dev_path, owner,
873 on_primary, disk.iv_name)
877 physical_id = device.unique_id
881 def RemoveBlockDevice(disk):
882 """Remove a block device.
884 This is intended to be called recursively.
888 # since we are removing the device, allow a partial match
889 # this allows removal of broken mirrors
890 rdev = _RecursiveFindBD(disk, allow_partial=True)
891 except errors.BlockDeviceError, err:
892 # probably can't attach
893 logging.info("Can't attach to device %s in remove", disk)
896 r_path = rdev.dev_path
897 result = rdev.Remove()
899 DevCacheManager.RemoveCache(r_path)
903 for child in disk.children:
904 result = result and RemoveBlockDevice(child)
908 def _RecursiveAssembleBD(disk, owner, as_primary):
909 """Activate a block device for an instance.
911 This is run on the primary and secondary nodes for an instance.
913 This function is called recursively.
916 disk: a objects.Disk object
917 as_primary: if we should make the block device read/write
920 the assembled device or None (in case no device was assembled)
922 If the assembly is not successful, an exception is raised.
927 mcn = disk.ChildrenNeeded()
929 mcn = 0 # max number of Nones allowed
931 mcn = len(disk.children) - mcn # max number of Nones
932 for chld_disk in disk.children:
934 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
935 except errors.BlockDeviceError, err:
936 if children.count(None) >= mcn:
939 logging.debug("Error in child activation: %s", str(err))
940 children.append(cdev)
942 if as_primary or disk.AssembleOnSecondary():
943 r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
944 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
946 if as_primary or disk.OpenOnSecondary():
948 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
949 as_primary, disk.iv_name)
956 def AssembleBlockDevice(disk, owner, as_primary):
957 """Activate a block device for an instance.
959 This is a wrapper over _RecursiveAssembleBD.
962 a /dev path for primary nodes
963 True for secondary nodes
966 result = _RecursiveAssembleBD(disk, owner, as_primary)
967 if isinstance(result, bdev.BlockDev):
968 result = result.dev_path
972 def ShutdownBlockDevice(disk):
973 """Shut down a block device.
975 First, if the device is assembled (can `Attach()`), then the device
976 is shutdown. Then the children of the device are shutdown.
978 This function is called recursively. Note that we don't cache the
979 children or such, as oppossed to assemble, shutdown of different
980 devices doesn't require that the upper device was active.
983 r_dev = _RecursiveFindBD(disk)
984 if r_dev is not None:
985 r_path = r_dev.dev_path
986 result = r_dev.Shutdown()
988 DevCacheManager.RemoveCache(r_path)
992 for child in disk.children:
993 result = result and ShutdownBlockDevice(child)
997 def MirrorAddChildren(parent_cdev, new_cdevs):
998 """Extend a mirrored block device.
1001 parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1002 if parent_bdev is None:
1003 logging.error("Can't find parent device")
1005 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1006 if new_bdevs.count(None) > 0:
1007 logging.error("Can't find new device(s) to add: %s:%s",
1008 new_bdevs, new_cdevs)
1010 parent_bdev.AddChildren(new_bdevs)
1014 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1015 """Shrink a mirrored block device.
1018 parent_bdev = _RecursiveFindBD(parent_cdev)
1019 if parent_bdev is None:
1020 logging.error("Can't find parent in remove children: %s", parent_cdev)
1023 for disk in new_cdevs:
1024 rpath = disk.StaticDevPath()
1026 bd = _RecursiveFindBD(disk)
1028 logging.error("Can't find dynamic device %s while removing children",
1032 devs.append(bd.dev_path)
1035 parent_bdev.RemoveChildren(devs)
1039 def GetMirrorStatus(disks):
1040 """Get the mirroring status of a list of devices.
1043 disks: list of `objects.Disk`
1046 list of (mirror_done, estimated_time) tuples, which
1047 are the result of bdev.BlockDevice.CombinedSyncStatus()
1052 rbd = _RecursiveFindBD(dsk)
1054 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1055 stats.append(rbd.CombinedSyncStatus())
1059 def _RecursiveFindBD(disk, allow_partial=False):
1060 """Check if a device is activated.
1062 If so, return informations about the real device.
1065 disk: the objects.Disk instance
1066 allow_partial: don't abort the find if a child of the
1067 device can't be found; this is intended to be
1068 used when repairing mirrors
1071 None if the device can't be found
1072 otherwise the device instance
1077 for chdisk in disk.children:
1078 children.append(_RecursiveFindBD(chdisk))
1080 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1083 def FindBlockDevice(disk):
1084 """Check if a device is activated.
1086 If so, return informations about the real device.
1089 disk: the objects.Disk instance
1091 None if the device can't be found
1092 (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1095 rbd = _RecursiveFindBD(disk)
1098 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1101 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1102 """Write a file to the filesystem.
1104 This allows the master to overwrite(!) a file. It will only perform
1105 the operation if the file belongs to a list of configuration files.
1108 if not os.path.isabs(file_name):
1109 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1114 constants.CLUSTER_CONF_FILE,
1115 constants.ETC_HOSTS,
1116 constants.SSH_KNOWN_HOSTS_FILE,
1117 constants.VNC_PASSWORD_FILE,
1120 if file_name not in allowed_files:
1121 logging.error("Filename passed to UploadFile not in allowed"
1122 " upload targets: '%s'", file_name)
1125 utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1126 atime=atime, mtime=mtime)
1130 def _ErrnoOrStr(err):
1131 """Format an EnvironmentError exception.
1133 If the `err` argument has an errno attribute, it will be looked up
1134 and converted into a textual EXXXX description. Otherwise the string
1135 representation of the error will be returned.
1138 if hasattr(err, 'errno'):
1139 detail = errno.errorcode[err.errno]
1145 def _OSOndiskVersion(name, os_dir):
1146 """Compute and return the API version of a given OS.
1148 This function will try to read the API version of the os given by
1149 the 'name' parameter and residing in the 'os_dir' directory.
1151 Return value will be either an integer denoting the version or None in the
1152 case when this is not a valid OS name.
1155 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1158 st = os.stat(api_file)
1159 except EnvironmentError, err:
1160 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1161 " found (%s)" % _ErrnoOrStr(err))
1163 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1164 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1170 api_versions = f.readlines()
1173 except EnvironmentError, err:
1174 raise errors.InvalidOS(name, os_dir, "error while reading the"
1175 " API version (%s)" % _ErrnoOrStr(err))
1177 api_versions = [version.strip() for version in api_versions]
1179 api_versions = [int(version) for version in api_versions]
1180 except (TypeError, ValueError), err:
1181 raise errors.InvalidOS(name, os_dir,
1182 "API version is not integer (%s)" % str(err))
1187 def DiagnoseOS(top_dirs=None):
1188 """Compute the validity for all OSes.
1190 Returns an OS object for each name in all the given top directories
1191 (if not given defaults to constants.OS_SEARCH_PATH)
1197 if top_dirs is None:
1198 top_dirs = constants.OS_SEARCH_PATH
1201 for dir_name in top_dirs:
1202 if os.path.isdir(dir_name):
1204 f_names = utils.ListVisibleFiles(dir_name)
1205 except EnvironmentError, err:
1206 logging.exception("Can't list the OS directory %s", dir_name)
1208 for name in f_names:
1210 os_inst = OSFromDisk(name, base_dir=dir_name)
1211 result.append(os_inst)
1212 except errors.InvalidOS, err:
1213 result.append(objects.OS.FromInvalidOS(err))
1218 def OSFromDisk(name, base_dir=None):
1219 """Create an OS instance from disk.
1221 This function will return an OS instance if the given name is a
1222 valid OS name. Otherwise, it will raise an appropriate
1223 `errors.InvalidOS` exception, detailing why this is not a valid
1227 os_dir: Directory containing the OS scripts. Defaults to a search
1228 in all the OS_SEARCH_PATH directories.
1232 if base_dir is None:
1233 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1235 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1237 os_dir = os.path.sep.join([base_dir, name])
1239 api_versions = _OSOndiskVersion(name, os_dir)
1241 if constants.OS_API_VERSION not in api_versions:
1242 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1243 " (found %s want %s)"
1244 % (api_versions, constants.OS_API_VERSION))
1246 # OS Scripts dictionary, we will populate it with the actual script names
1247 os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1249 for script in os_scripts:
1250 os_scripts[script] = os.path.sep.join([os_dir, script])
1253 st = os.stat(os_scripts[script])
1254 except EnvironmentError, err:
1255 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1256 (script, _ErrnoOrStr(err)))
1258 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1259 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1262 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1263 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1267 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1268 create_script=os_scripts['create'],
1269 export_script=os_scripts['export'],
1270 import_script=os_scripts['import'],
1271 rename_script=os_scripts['rename'],
1272 api_versions=api_versions)
1275 def GrowBlockDevice(disk, amount):
1276 """Grow a stack of block devices.
1278 This function is called recursively, with the childrens being the
1282 disk: the disk to be grown
1284 Returns: a tuple of (status, result), with:
1285 status: the result (true/false) of the operation
1286 result: the error message if the operation failed, otherwise not used
1289 r_dev = _RecursiveFindBD(disk)
1291 return False, "Cannot find block device %s" % (disk,)
1295 except errors.BlockDeviceError, err:
1296 return False, str(err)
1301 def SnapshotBlockDevice(disk):
1302 """Create a snapshot copy of a block device.
1304 This function is called recursively, and the snapshot is actually created
1305 just for the leaf lvm backend device.
1308 disk: the disk to be snapshotted
1311 a config entry for the actual lvm device snapshotted.
1315 if len(disk.children) == 1:
1316 # only one child, let's recurse on it
1317 return SnapshotBlockDevice(disk.children[0])
1319 # more than one child, choose one that matches
1320 for child in disk.children:
1321 if child.size == disk.size:
1322 # return implies breaking the loop
1323 return SnapshotBlockDevice(child)
1324 elif disk.dev_type == constants.LD_LV:
1325 r_dev = _RecursiveFindBD(disk)
1326 if r_dev is not None:
1327 # let's stay on the safe side and ask for the full size, for now
1328 return r_dev.Snapshot(disk.size)
1332 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1333 " '%s' of type '%s'" %
1334 (disk.unique_id, disk.dev_type))
1337 def ExportSnapshot(disk, dest_node, instance, cluster_name):
1338 """Export a block device snapshot to a remote node.
1341 disk: the snapshot block device
1342 dest_node: the node to send the image to
1343 instance: instance being exported
1346 True if successful, False otherwise.
1349 inst_os = OSFromDisk(instance.os)
1350 export_script = inst_os.export_script
1352 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1353 instance.name, int(time.time()))
1354 if not os.path.exists(constants.LOG_OS_DIR):
1355 os.mkdir(constants.LOG_OS_DIR, 0750)
1357 real_os_dev = _RecursiveFindBD(disk)
1358 if real_os_dev is None:
1359 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1363 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1364 destfile = disk.physical_id[1]
1366 # the target command is built out of three individual commands,
1367 # which are joined by pipes; we check each individual command for
1370 expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1371 export_script, instance.name,
1372 real_os_dev.dev_path, logfile)
1376 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1377 destdir, destdir, destfile)
1378 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1379 constants.GANETI_RUNAS,
1382 # all commands have been checked, so we're safe to combine them
1383 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1385 result = utils.RunCmd(command)
1388 logging.error("os snapshot export command '%s' returned error: %s"
1389 " output: %s", command, result.fail_reason, result.output)
1395 def FinalizeExport(instance, snap_disks):
1396 """Write out the export configuration information.
1399 instance: instance configuration
1400 snap_disks: snapshot block devices
1403 False in case of error, True otherwise.
1406 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1407 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1409 config = objects.SerializableConfigParser()
1411 config.add_section(constants.INISECT_EXP)
1412 config.set(constants.INISECT_EXP, 'version', '0')
1413 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1414 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1415 config.set(constants.INISECT_EXP, 'os', instance.os)
1416 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1418 config.add_section(constants.INISECT_INS)
1419 config.set(constants.INISECT_INS, 'name', instance.name)
1420 config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1421 config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1422 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1425 for nic_count, nic in enumerate(instance.nics):
1426 config.set(constants.INISECT_INS, 'nic%d_mac' %
1427 nic_count, '%s' % nic.mac)
1428 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1429 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1431 # TODO: redundant: on load can read nics until it doesn't exist
1432 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1435 for disk_count, disk in enumerate(snap_disks):
1436 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1437 ('%s' % disk.iv_name))
1438 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1439 ('%s' % disk.physical_id[1]))
1440 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1442 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1444 cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1445 cfo = open(cff, 'w')
1451 shutil.rmtree(finaldestdir, True)
1452 shutil.move(destdir, finaldestdir)
1457 def ExportInfo(dest):
1458 """Get export configuration information.
1461 dest: directory containing the export
1464 A serializable config file containing the export info.
1467 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1469 config = objects.SerializableConfigParser()
1472 if (not config.has_section(constants.INISECT_EXP) or
1473 not config.has_section(constants.INISECT_INS)):
1479 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1481 """Import an os image into an instance.
1484 instance: the instance object
1485 os_disk: the instance-visible name of the os device
1486 swap_disk: the instance-visible name of the swap device
1487 src_node: node holding the source image
1488 src_image: path to the source image on src_node
1491 False in case of error, True otherwise.
1494 inst_os = OSFromDisk(instance.os)
1495 import_script = inst_os.import_script
1497 os_device = instance.FindDisk(os_disk)
1498 if os_device is None:
1499 logging.error("Can't find this device-visible name '%s'", os_disk)
1502 swap_device = instance.FindDisk(swap_disk)
1503 if swap_device is None:
1504 logging.error("Can't find this device-visible name '%s'", swap_disk)
1507 real_os_dev = _RecursiveFindBD(os_device)
1508 if real_os_dev is None:
1509 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1513 real_swap_dev = _RecursiveFindBD(swap_device)
1514 if real_swap_dev is None:
1515 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1517 real_swap_dev.Open()
1519 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1520 instance.name, int(time.time()))
1521 if not os.path.exists(constants.LOG_OS_DIR):
1522 os.mkdir(constants.LOG_OS_DIR, 0750)
1524 destcmd = utils.BuildShellCmd('cat %s', src_image)
1525 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1526 constants.GANETI_RUNAS,
1530 impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1531 inst_os.path, import_script, instance.name,
1532 real_os_dev.dev_path, real_swap_dev.dev_path,
1535 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1536 env = {'HYPERVISOR': instance.hypervisor}
1538 result = utils.RunCmd(command, env=env)
1541 logging.error("os import command '%s' returned error: %s"
1542 " output: %s", command, result.fail_reason, result.output)
1549 """Return a list of exports currently available on this machine.
1552 if os.path.isdir(constants.EXPORT_DIR):
1553 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1558 def RemoveExport(export):
1559 """Remove an existing export from the node.
1562 export: the name of the export to remove
1565 False in case of error, True otherwise.
1568 target = os.path.join(constants.EXPORT_DIR, export)
1570 shutil.rmtree(target)
1571 # TODO: catch some of the relevant exceptions and provide a pretty
1572 # error message if rmtree fails.
1577 def RenameBlockDevices(devlist):
1578 """Rename a list of block devices.
1580 The devlist argument is a list of tuples (disk, new_logical,
1581 new_physical). The return value will be a combined boolean result
1582 (True only if all renames succeeded).
1586 for disk, unique_id in devlist:
1587 dev = _RecursiveFindBD(disk)
1592 old_rpath = dev.dev_path
1593 dev.Rename(unique_id)
1594 new_rpath = dev.dev_path
1595 if old_rpath != new_rpath:
1596 DevCacheManager.RemoveCache(old_rpath)
1597 # FIXME: we should add the new cache information here, like:
1598 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1599 # but we don't have the owner here - maybe parse from existing
1600 # cache? for now, we only lose lvm data when we rename, which
1601 # is less critical than DRBD or MD
1602 except errors.BlockDeviceError, err:
1603 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1608 def _TransformFileStorageDir(file_storage_dir):
1609 """Checks whether given file_storage_dir is valid.
1611 Checks wheter the given file_storage_dir is within the cluster-wide
1612 default file_storage_dir stored in SimpleStore. Only paths under that
1613 directory are allowed.
1616 file_storage_dir: string with path
1619 normalized file_storage_dir (string) if valid, None otherwise
1623 file_storage_dir = os.path.normpath(file_storage_dir)
1624 base_file_storage_dir = cfg.GetFileStorageDir()
1625 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1626 base_file_storage_dir):
1627 logging.error("file storage directory '%s' is not under base file"
1628 " storage directory '%s'",
1629 file_storage_dir, base_file_storage_dir)
1631 return file_storage_dir
1634 def CreateFileStorageDir(file_storage_dir):
1635 """Create file storage directory.
1638 file_storage_dir: string containing the path
1641 tuple with first element a boolean indicating wheter dir
1642 creation was successful or not
1645 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1647 if not file_storage_dir:
1650 if os.path.exists(file_storage_dir):
1651 if not os.path.isdir(file_storage_dir):
1652 logging.error("'%s' is not a directory", file_storage_dir)
1656 os.makedirs(file_storage_dir, 0750)
1657 except OSError, err:
1658 logging.error("Cannot create file storage directory '%s': %s",
1659 file_storage_dir, err)
1664 def RemoveFileStorageDir(file_storage_dir):
1665 """Remove file storage directory.
1667 Remove it only if it's empty. If not log an error and return.
1670 file_storage_dir: string containing the path
1673 tuple with first element a boolean indicating wheter dir
1674 removal was successful or not
1677 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1679 if not file_storage_dir:
1682 if os.path.exists(file_storage_dir):
1683 if not os.path.isdir(file_storage_dir):
1684 logging.error("'%s' is not a directory", file_storage_dir)
1686 # deletes dir only if empty, otherwise we want to return False
1688 os.rmdir(file_storage_dir)
1689 except OSError, err:
1690 logging.exception("Cannot remove file storage directory '%s'",
1696 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1697 """Rename the file storage directory.
1700 old_file_storage_dir: string containing the old path
1701 new_file_storage_dir: string containing the new path
1704 tuple with first element a boolean indicating wheter dir
1705 rename was successful or not
1708 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1709 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1711 if not old_file_storage_dir or not new_file_storage_dir:
1714 if not os.path.exists(new_file_storage_dir):
1715 if os.path.isdir(old_file_storage_dir):
1717 os.rename(old_file_storage_dir, new_file_storage_dir)
1718 except OSError, err:
1719 logging.exception("Cannot rename '%s' to '%s'",
1720 old_file_storage_dir, new_file_storage_dir)
1723 logging.error("'%s' is not a directory", old_file_storage_dir)
1726 if os.path.exists(old_file_storage_dir):
1727 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1728 old_file_storage_dir, new_file_storage_dir)
1733 def _IsJobQueueFile(file_name):
1734 """Checks whether the given filename is in the queue directory.
1737 queue_dir = os.path.normpath(constants.QUEUE_DIR)
1738 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1741 logging.error("'%s' is not a file in the queue directory",
1747 def JobQueueUpdate(file_name, content):
1748 """Updates a file in the queue directory.
1751 if not _IsJobQueueFile(file_name):
1754 # Write and replace the file atomically
1755 utils.WriteFile(file_name, data=content)
1760 def JobQueueRename(old, new):
1761 """Renames a job queue file.
1764 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1772 def CloseBlockDevices(disks):
1773 """Closes the given block devices.
1775 This means they will be switched to secondary mode (in case of DRBD).
1780 rd = _RecursiveFindBD(cf)
1782 return (False, "Can't find device %s" % cf)
1789 except errors.BlockDeviceError, err:
1790 msg.append(str(err))
1792 return (False, "Can't make devices secondary: %s" % ",".join(msg))
1794 return (True, "All devices secondary")
1797 def ValidateHVParams(hvname, hvparams):
1798 """Validates the given hypervisor parameters.
1800 @type hvname: string
1801 @param hvname: the hypervisor name
1802 @type hvparams: dict
1803 @param hvparams: the hypervisor parameters to be validated
1804 @rtype: tuple (bool, str)
1805 @return: tuple of (success, message)
1809 hv_type = hypervisor.GetHypervisor(hvname)
1810 hv_type.ValidateParameters(hvparams)
1811 return (True, "Validation passed")
1812 except errors.HypervisorError, err:
1813 return (False, str(err))
1816 class HooksRunner(object):
1819 This class is instantiated on the node side (ganeti-noded) and not on
1823 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1825 def __init__(self, hooks_base_dir=None):
1826 """Constructor for hooks runner.
1829 - hooks_base_dir: if not None, this overrides the
1830 constants.HOOKS_BASE_DIR (useful for unittests)
1833 if hooks_base_dir is None:
1834 hooks_base_dir = constants.HOOKS_BASE_DIR
1835 self._BASE_DIR = hooks_base_dir
1838 def ExecHook(script, env):
1839 """Exec one hook script.
1842 - script: the full path to the script
1843 - env: the environment with which to exec the script
1846 # exec the process using subprocess and log the output
1849 fdstdin = open("/dev/null", "r")
1850 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1851 stderr=subprocess.STDOUT, close_fds=True,
1852 shell=False, cwd="/", env=env)
1855 output = child.stdout.read(4096)
1856 child.stdout.close()
1857 except EnvironmentError, err:
1858 output += "Hook script error: %s" % str(err)
1862 result = child.wait()
1864 except EnvironmentError, err:
1865 if err.errno == errno.EINTR:
1869 # try not to leak fds
1870 for fd in (fdstdin, ):
1874 except EnvironmentError, err:
1875 # just log the error
1876 #logging.exception("Error while closing fd %s", fd)
1879 return result == 0, output
1881 def RunHooks(self, hpath, phase, env):
1882 """Run the scripts in the hooks directory.
1884 This method will not be usually overriden by child opcodes.
1887 if phase == constants.HOOKS_PHASE_PRE:
1889 elif phase == constants.HOOKS_PHASE_POST:
1892 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1895 subdir = "%s-%s.d" % (hpath, suffix)
1896 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1898 dir_contents = utils.ListVisibleFiles(dir_name)
1899 except OSError, err:
1903 # we use the standard python sort order,
1904 # so 00name is the recommended naming scheme
1906 for relname in dir_contents:
1907 fname = os.path.join(dir_name, relname)
1908 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1909 self.RE_MASK.match(relname) is not None):
1910 rrval = constants.HKR_SKIP
1913 result, output = self.ExecHook(fname, env)
1915 rrval = constants.HKR_FAIL
1917 rrval = constants.HKR_SUCCESS
1918 rr.append(("%s/%s" % (subdir, relname), rrval, output))
1923 class IAllocatorRunner(object):
1924 """IAllocator runner.
1926 This class is instantiated on the node side (ganeti-noded) and not on
1930 def Run(self, name, idata):
1931 """Run an iallocator script.
1933 Return value: tuple of:
1934 - run status (one of the IARUN_ constants)
1937 - fail reason (as from utils.RunResult)
1940 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1942 if alloc_script is None:
1943 return (constants.IARUN_NOTFOUND, None, None, None)
1945 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1949 result = utils.RunCmd([alloc_script, fin_name])
1951 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1956 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1959 class DevCacheManager(object):
1960 """Simple class for managing a cache of block device information.
1963 _DEV_PREFIX = "/dev/"
1964 _ROOT_DIR = constants.BDEV_CACHE_DIR
1967 def _ConvertPath(cls, dev_path):
1968 """Converts a /dev/name path to the cache file name.
1970 This replaces slashes with underscores and strips the /dev
1971 prefix. It then returns the full path to the cache file
1974 if dev_path.startswith(cls._DEV_PREFIX):
1975 dev_path = dev_path[len(cls._DEV_PREFIX):]
1976 dev_path = dev_path.replace("/", "_")
1977 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1981 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1982 """Updates the cache information for a given device.
1985 if dev_path is None:
1986 logging.error("DevCacheManager.UpdateCache got a None dev_path")
1988 fpath = cls._ConvertPath(dev_path)
1994 iv_name = "not_visible"
1995 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1997 utils.WriteFile(fpath, data=fdata)
1998 except EnvironmentError, err:
1999 logging.exception("Can't update bdev cache for %s", dev_path)
2002 def RemoveCache(cls, dev_path):
2003 """Remove data for a dev_path.
2006 if dev_path is None:
2007 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2009 fpath = cls._ConvertPath(dev_path)
2011 utils.RemoveFile(fpath)
2012 except EnvironmentError, err:
2013 logging.exception("Can't update bdev cache for %s", dev_path)