4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
48 return ssconf.SimpleConfigReader()
51 def _GetSshRunner(cluster_name):
52 return ssh.SshRunner(cluster_name)
55 def _CleanDirectory(path, exclude=[]):
56 """Removes all regular files in a directory.
58 @param exclude: List of files to be excluded.
62 if not os.path.isdir(path):
65 # Normalize excluded paths
66 exclude = [os.path.normpath(i) for i in exclude]
68 for rel_name in utils.ListVisibleFiles(path):
69 full_name = os.path.normpath(os.path.join(path, rel_name))
70 if full_name in exclude:
72 if os.path.isfile(full_name) and not os.path.islink(full_name):
73 utils.RemoveFile(full_name)
77 """Removes job queue files and archived jobs
80 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
85 """Returns master information.
87 This is an utility function to compute master information, either
88 for consumption here or from the node daemon.
91 @return: (master_netdev, master_ip, master_name)
96 master_netdev = cfg.GetMasterNetdev()
97 master_ip = cfg.GetMasterIP()
98 master_node = cfg.GetMasterNode()
99 except errors.ConfigurationError, err:
100 logging.exception("Cluster configuration incomplete")
102 return (master_netdev, master_ip, master_node)
105 def StartMaster(start_daemons):
106 """Activate local node as master node.
108 The function will always try activate the IP address of the master
109 (if someone else has it, then it won't). Then, if the start_daemons
110 parameter is True, it will also start the master daemons
111 (ganet-masterd and ganeti-rapi).
115 master_netdev, master_ip, _ = GetMasterInfo()
116 if not master_netdev:
119 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120 if utils.OwnIpAddress(master_ip):
121 # we already have the ip:
122 logging.debug("Already started")
124 logging.error("Someone else has the master ip, not activating")
127 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128 "dev", master_netdev, "label",
129 "%s:0" % master_netdev])
131 logging.error("Can't activate master IP: %s", result.output)
134 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135 "-s", master_ip, master_ip])
136 # we'll ignore the exit code of arping
138 # and now start the master and rapi daemons
140 for daemon in 'ganeti-masterd', 'ganeti-rapi':
141 result = utils.RunCmd([daemon])
143 logging.error("Can't start daemon %s: %s", daemon, result.output)
148 def StopMaster(stop_daemons):
149 """Deactivate this node as master.
151 The function will always try to deactivate the IP address of the
152 master. Then, if the stop_daemons parameter is True, it will also
153 stop the master daemons (ganet-masterd and ganeti-rapi).
156 master_netdev, master_ip, _ = GetMasterInfo()
157 if not master_netdev:
160 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161 "dev", master_netdev])
163 logging.error("Can't remove the master IP, error: %s", result.output)
164 # but otherwise ignore the failure
167 # stop/kill the rapi and the master daemon
168 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175 """Joins this node to the cluster.
177 This does the following:
178 - updates the hostkeys of the machine (rsa and dsa)
179 - adds the ssh private key to the user
180 - adds the ssh public key to the users' authorized_keys file
183 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187 for name, content, mode in sshd_keys:
188 utils.WriteFile(name, data=content, mode=mode)
191 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193 except errors.OpExecError, err:
194 logging.exception("Error while processing user ssh files")
197 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198 utils.WriteFile(name, data=content, mode=0600)
200 utils.AddAuthorizedKey(auth_keys, sshpub)
202 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
208 """Cleans up the current node and prepares it to be removed from the cluster.
211 _CleanDirectory(constants.DATA_DIR)
215 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216 except errors.OpExecError:
217 logging.exception("Error while processing ssh files")
220 f = open(pub_key, 'r')
222 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
226 utils.RemoveFile(priv_key)
227 utils.RemoveFile(pub_key)
229 # Return a reassuring string to the caller, and quit
230 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
233 def GetNodeInfo(vgname, hypervisor_type):
234 """Gives back a hash with different informations about the node.
236 @type vgname: C{string}
237 @param vgname: the name of the volume group to ask for disk space information
238 @type hypervisor_type: C{str}
239 @param hypervisor_type: the name of the hypervisor to ask for
242 @return: dictionary with the following keys:
243 - vg_size is the size of the configured volume group in MiB
244 - vg_free is the free size of the volume group in MiB
245 - memory_dom0 is the memory allocated for domain0 in MiB
246 - memory_free is the currently available (free) ram in MiB
247 - memory_total is the total number of ram in MiB
251 vginfo = _GetVGInfo(vgname)
252 outputarray['vg_size'] = vginfo['vg_size']
253 outputarray['vg_free'] = vginfo['vg_free']
255 hyper = hypervisor.GetHypervisor(hypervisor_type)
256 hyp_info = hyper.GetNodeInfo()
257 if hyp_info is not None:
258 outputarray.update(hyp_info)
260 f = open("/proc/sys/kernel/random/boot_id", 'r')
262 outputarray["bootid"] = f.read(128).rstrip("\n")
269 def VerifyNode(what, cluster_name):
270 """Verify the status of the local node.
272 Based on the input L{what} parameter, various checks are done on the
275 If the I{filelist} key is present, this list of
276 files is checksummed and the file/checksum pairs are returned.
278 If the I{nodelist} key is present, we check that we have
279 connectivity via ssh with the target nodes (and check the hostname
282 If the I{node-net-test} key is present, we check that we have
283 connectivity to the given nodes via both primary IP and, if
284 applicable, secondary IPs.
287 @param what: a dictionary of things to check:
288 - filelist: list of files for which to compute checksums
289 - nodelist: list of nodes we should check ssh communication with
290 - node-net-test: list of nodes we should check node daemon port
292 - hypervisor: list with hypervisors to run the verify for
297 if 'hypervisor' in what:
298 result['hypervisor'] = my_dict = {}
299 for hv_name in what['hypervisor']:
300 my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
302 if 'filelist' in what:
303 result['filelist'] = utils.FingerprintFiles(what['filelist'])
305 if 'nodelist' in what:
306 result['nodelist'] = {}
307 random.shuffle(what['nodelist'])
308 for node in what['nodelist']:
309 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
311 result['nodelist'][node] = message
312 if 'node-net-test' in what:
313 result['node-net-test'] = {}
314 my_name = utils.HostInfo().name
315 my_pip = my_sip = None
316 for name, pip, sip in what['node-net-test']:
322 result['node-net-test'][my_name] = ("Can't find my own"
323 " primary/secondary IP"
326 port = utils.GetNodeDaemonPort()
327 for name, pip, sip in what['node-net-test']:
329 if not utils.TcpPing(pip, port, source=my_pip):
330 fail.append("primary")
332 if not utils.TcpPing(sip, port, source=my_sip):
333 fail.append("secondary")
335 result['node-net-test'][name] = ("failure using the %s"
342 def GetVolumeList(vg_name):
343 """Compute list of logical volumes and their size.
346 dictionary of all partions (key) with their size (in MiB), inactive
348 {'test1': ('20.06', True, True)}
353 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354 "--separator=%s" % sep,
355 "-olv_name,lv_size,lv_attr", vg_name])
357 logging.error("Failed to list logical volumes, lvs output: %s",
361 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362 for line in result.stdout.splitlines():
364 match = valid_line_re.match(line)
366 logging.error("Invalid line returned from lvs output: '%s'", line)
368 name, size, attr = match.groups()
369 inactive = attr[4] == '-'
370 online = attr[5] == 'o'
371 lvs[name] = (size, inactive, online)
376 def ListVolumeGroups():
377 """List the volume groups and their size.
380 Dictionary with keys volume name and values the size of the volume
383 return utils.ListVolumeGroups()
387 """List all volumes on this node.
390 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
392 "--options=lv_name,lv_size,devices,vg_name"])
394 logging.error("Failed to list logical volumes, lvs output: %s",
400 return dev.split('(')[0]
406 'name': line[0].strip(),
407 'size': line[1].strip(),
408 'dev': parse_dev(line[2].strip()),
409 'vg': line[3].strip(),
412 return [map_line(line.split('|')) for line in result.stdout.splitlines()
413 if line.count('|') >= 3]
416 def BridgesExist(bridges_list):
417 """Check if a list of bridges exist on the current node.
420 True if all of them exist, false otherwise
423 for bridge in bridges_list:
424 if not utils.BridgeExists(bridge):
430 def GetInstanceList(hypervisor_list):
431 """Provides a list of instances.
433 @type hypervisor_list: list
434 @param hypervisor_list: the list of hypervisors to query information
437 @return: a list of all running instances on the current node
438 - instance1.example.com
439 - instance2.example.com
443 for hname in hypervisor_list:
445 names = hypervisor.GetHypervisor(hname).ListInstances()
446 results.extend(names)
447 except errors.HypervisorError, err:
448 logging.exception("Error enumerating instances for hypevisor %s", hname)
449 # FIXME: should we somehow not propagate this to the master?
455 def GetInstanceInfo(instance, hname):
456 """Gives back the informations about an instance as a dictionary.
458 @type instance: string
459 @param instance: the instance name
461 @param hname: the hypervisor type of the instance
464 @return: dictionary with the following keys:
465 - memory: memory size of instance (int)
466 - state: xen state of instance (string)
467 - time: cpu time of instance (float)
472 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473 if iinfo is not None:
474 output['memory'] = iinfo[2]
475 output['state'] = iinfo[4]
476 output['time'] = iinfo[5]
481 def GetAllInstancesInfo(hypervisor_list):
482 """Gather data about all instances.
484 This is the equivalent of `GetInstanceInfo()`, except that it
485 computes data for all instances at once, thus being faster if one
486 needs data about more than one instance.
488 @type hypervisor_list: list
489 @param hypervisor_list: list of hypervisors to query for instance data
491 @rtype: dict of dicts
492 @return: dictionary of instance: data, with data having the following keys:
493 - memory: memory size of instance (int)
494 - state: xen state of instance (string)
495 - time: cpu time of instance (float)
496 - vcpuus: the number of vcpus
501 for hname in hypervisor_list:
502 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
504 for name, inst_id, memory, vcpus, state, times in iinfo:
511 if name in output and output[name] != value:
512 raise errors.HypervisorError("Instance %s running duplicate"
513 " with different parameters" % name)
519 def AddOSToInstance(instance, os_disk, swap_disk):
520 """Add an OS to an instance.
523 instance: the instance object
524 os_disk: the instance-visible name of the os device
525 swap_disk: the instance-visible name of the swap device
528 inst_os = OSFromDisk(instance.os)
530 create_script = inst_os.create_script
532 os_device = instance.FindDisk(os_disk)
533 if os_device is None:
534 logging.error("Can't find this device-visible name '%s'", os_disk)
537 swap_device = instance.FindDisk(swap_disk)
538 if swap_device is None:
539 logging.error("Can't find this device-visible name '%s'", swap_disk)
542 real_os_dev = _RecursiveFindBD(os_device)
543 if real_os_dev is None:
544 raise errors.BlockDeviceError("Block device '%s' is not set up" %
548 real_swap_dev = _RecursiveFindBD(swap_device)
549 if real_swap_dev is None:
550 raise errors.BlockDeviceError("Block device '%s' is not set up" %
554 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
555 instance.name, int(time.time()))
556 if not os.path.exists(constants.LOG_OS_DIR):
557 os.mkdir(constants.LOG_OS_DIR, 0750)
559 command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
560 inst_os.path, create_script, instance.name,
561 real_os_dev.dev_path, real_swap_dev.dev_path,
563 env = {'HYPERVISOR': instance.hypervisor}
565 result = utils.RunCmd(command, env=env)
567 logging.error("os create command '%s' returned error: %s, logfile: %s,"
568 " output: %s", command, result.fail_reason, logfile,
575 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
576 """Run the OS rename script for an instance.
579 instance: the instance object
580 old_name: the old name of the instance
581 os_disk: the instance-visible name of the os device
582 swap_disk: the instance-visible name of the swap device
585 inst_os = OSFromDisk(instance.os)
587 script = inst_os.rename_script
589 os_device = instance.FindDisk(os_disk)
590 if os_device is None:
591 logging.error("Can't find this device-visible name '%s'", os_disk)
594 swap_device = instance.FindDisk(swap_disk)
595 if swap_device is None:
596 logging.error("Can't find this device-visible name '%s'", swap_disk)
599 real_os_dev = _RecursiveFindBD(os_device)
600 if real_os_dev is None:
601 raise errors.BlockDeviceError("Block device '%s' is not set up" %
605 real_swap_dev = _RecursiveFindBD(swap_device)
606 if real_swap_dev is None:
607 raise errors.BlockDeviceError("Block device '%s' is not set up" %
611 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
613 instance.name, int(time.time()))
614 if not os.path.exists(constants.LOG_OS_DIR):
615 os.mkdir(constants.LOG_OS_DIR, 0750)
617 command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
618 inst_os.path, script, old_name, instance.name,
619 real_os_dev.dev_path, real_swap_dev.dev_path,
622 result = utils.RunCmd(command)
625 logging.error("os create command '%s' returned error: %s output: %s",
626 command, result.fail_reason, result.output)
632 def _GetVGInfo(vg_name):
633 """Get informations about the volume group.
636 vg_name: the volume group
639 { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
641 vg_size is the total size of the volume group in MiB
642 vg_free is the free size of the volume group in MiB
643 pv_count are the number of physical disks in that vg
645 If an error occurs during gathering of data, we return the same dict
646 with keys all set to None.
649 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
651 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
652 "--nosuffix", "--units=m", "--separator=:", vg_name])
655 logging.error("volume group %s not present", vg_name)
657 valarr = retval.stdout.strip().rstrip(':').split(':')
661 "vg_size": int(round(float(valarr[0]), 0)),
662 "vg_free": int(round(float(valarr[1]), 0)),
663 "pv_count": int(valarr[2]),
665 except ValueError, err:
666 logging.exception("Fail to parse vgs output")
668 logging.error("vgs output has the wrong number of fields (expected"
669 " three): %s", str(valarr))
673 def _GatherBlockDevs(instance):
674 """Set up an instance's block device(s).
676 This is run on the primary node at instance startup. The block
677 devices must be already assembled.
681 for disk in instance.disks:
682 device = _RecursiveFindBD(disk)
684 raise errors.BlockDeviceError("Block device '%s' is not set up." %
687 block_devices.append((disk, device))
691 def StartInstance(instance, extra_args):
692 """Start an instance.
694 @type instance: instance object
695 @param instance: the instance object
697 @return: whether the startup was successful or not
700 running_instances = GetInstanceList([instance.hypervisor])
702 if instance.name in running_instances:
705 block_devices = _GatherBlockDevs(instance)
706 hyper = hypervisor.GetHypervisor(instance.hypervisor)
709 hyper.StartInstance(instance, block_devices, extra_args)
710 except errors.HypervisorError, err:
711 logging.exception("Failed to start instance")
717 def ShutdownInstance(instance):
718 """Shut an instance down.
720 @type instance: instance object
721 @param instance: the instance object
723 @return: whether the startup was successful or not
726 hv_name = instance.hypervisor
727 running_instances = GetInstanceList([hv_name])
729 if instance.name not in running_instances:
732 hyper = hypervisor.GetHypervisor(hv_name)
734 hyper.StopInstance(instance)
735 except errors.HypervisorError, err:
736 logging.error("Failed to stop instance")
739 # test every 10secs for 2min
743 for dummy in range(11):
744 if instance.name not in GetInstanceList([hv_name]):
748 # the shutdown did not succeed
749 logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
752 hyper.StopInstance(instance, force=True)
753 except errors.HypervisorError, err:
754 logging.exception("Failed to stop instance")
758 if instance.name in GetInstanceList([hv_name]):
759 logging.error("could not shutdown instance '%s' even by destroy",
766 def RebootInstance(instance, reboot_type, extra_args):
767 """Reboot an instance.
770 instance - name of instance to reboot
771 reboot_type - how to reboot [soft,hard,full]
774 running_instances = GetInstanceList([instance.hypervisor])
776 if instance.name not in running_instances:
777 logging.error("Cannot reboot instance that is not running")
780 hyper = hypervisor.GetHypervisor(instance.hypervisor)
781 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
783 hyper.RebootInstance(instance)
784 except errors.HypervisorError, err:
785 logging.exception("Failed to soft reboot instance")
787 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
789 ShutdownInstance(instance)
790 StartInstance(instance, extra_args)
791 except errors.HypervisorError, err:
792 logging.exception("Failed to hard reboot instance")
795 raise errors.ParameterError("reboot_type invalid")
800 def MigrateInstance(instance, target, live):
801 """Migrates an instance to another node.
803 @type instance: C{objects.Instance}
804 @param instance: the instance definition
806 @param target: the target node name
808 @param live: whether the migration should be done live or not (the
809 interpretation of this parameter is left to the hypervisor)
811 @return: a tuple of (success, msg) where:
812 - succes is a boolean denoting the success/failure of the operation
813 - msg is a string with details in case of failure
816 hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
819 hyper.MigrateInstance(instance.name, target, live)
820 except errors.HypervisorError, err:
821 msg = "Failed to migrate instance: %s" % str(err)
824 return (True, "Migration successfull")
827 def CreateBlockDevice(disk, size, owner, on_primary, info):
828 """Creates a block device for an instance.
831 disk: a ganeti.objects.Disk object
832 size: the size of the physical underlying device
833 owner: a string with the name of the instance
834 on_primary: a boolean indicating if it is the primary node or not
835 info: string that will be sent to the physical device creation
838 the new unique_id of the device (this can sometime be
839 computed only after creation), or None. On secondary nodes,
840 it's not required to return anything.
845 for child in disk.children:
846 crdev = _RecursiveAssembleBD(child, owner, on_primary)
847 if on_primary or disk.AssembleOnSecondary():
848 # we need the children open in case the device itself has to
853 device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
854 if device is not None:
855 logging.info("removing existing device %s", disk)
857 except errors.BlockDeviceError, err:
860 device = bdev.Create(disk.dev_type, disk.physical_id,
863 raise ValueError("Can't create child device for %s, %s" %
865 if on_primary or disk.AssembleOnSecondary():
866 if not device.Assemble():
867 errorstring = "Can't assemble device after creation"
868 logging.error(errorstring)
869 raise errors.BlockDeviceError("%s, very unusual event - check the node"
870 " daemon logs" % errorstring)
871 device.SetSyncSpeed(constants.SYNC_SPEED)
872 if on_primary or disk.OpenOnSecondary():
873 device.Open(force=True)
874 DevCacheManager.UpdateCache(device.dev_path, owner,
875 on_primary, disk.iv_name)
879 physical_id = device.unique_id
883 def RemoveBlockDevice(disk):
884 """Remove a block device.
886 This is intended to be called recursively.
890 # since we are removing the device, allow a partial match
891 # this allows removal of broken mirrors
892 rdev = _RecursiveFindBD(disk, allow_partial=True)
893 except errors.BlockDeviceError, err:
894 # probably can't attach
895 logging.info("Can't attach to device %s in remove", disk)
898 r_path = rdev.dev_path
899 result = rdev.Remove()
901 DevCacheManager.RemoveCache(r_path)
905 for child in disk.children:
906 result = result and RemoveBlockDevice(child)
910 def _RecursiveAssembleBD(disk, owner, as_primary):
911 """Activate a block device for an instance.
913 This is run on the primary and secondary nodes for an instance.
915 This function is called recursively.
918 disk: a objects.Disk object
919 as_primary: if we should make the block device read/write
922 the assembled device or None (in case no device was assembled)
924 If the assembly is not successful, an exception is raised.
929 mcn = disk.ChildrenNeeded()
931 mcn = 0 # max number of Nones allowed
933 mcn = len(disk.children) - mcn # max number of Nones
934 for chld_disk in disk.children:
936 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
937 except errors.BlockDeviceError, err:
938 if children.count(None) >= mcn:
941 logging.debug("Error in child activation: %s", str(err))
942 children.append(cdev)
944 if as_primary or disk.AssembleOnSecondary():
945 r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
946 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
948 if as_primary or disk.OpenOnSecondary():
950 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
951 as_primary, disk.iv_name)
958 def AssembleBlockDevice(disk, owner, as_primary):
959 """Activate a block device for an instance.
961 This is a wrapper over _RecursiveAssembleBD.
964 a /dev path for primary nodes
965 True for secondary nodes
968 result = _RecursiveAssembleBD(disk, owner, as_primary)
969 if isinstance(result, bdev.BlockDev):
970 result = result.dev_path
974 def ShutdownBlockDevice(disk):
975 """Shut down a block device.
977 First, if the device is assembled (can `Attach()`), then the device
978 is shutdown. Then the children of the device are shutdown.
980 This function is called recursively. Note that we don't cache the
981 children or such, as oppossed to assemble, shutdown of different
982 devices doesn't require that the upper device was active.
985 r_dev = _RecursiveFindBD(disk)
986 if r_dev is not None:
987 r_path = r_dev.dev_path
988 result = r_dev.Shutdown()
990 DevCacheManager.RemoveCache(r_path)
994 for child in disk.children:
995 result = result and ShutdownBlockDevice(child)
999 def MirrorAddChildren(parent_cdev, new_cdevs):
1000 """Extend a mirrored block device.
1003 parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1004 if parent_bdev is None:
1005 logging.error("Can't find parent device")
1007 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1008 if new_bdevs.count(None) > 0:
1009 logging.error("Can't find new device(s) to add: %s:%s",
1010 new_bdevs, new_cdevs)
1012 parent_bdev.AddChildren(new_bdevs)
1016 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1017 """Shrink a mirrored block device.
1020 parent_bdev = _RecursiveFindBD(parent_cdev)
1021 if parent_bdev is None:
1022 logging.error("Can't find parent in remove children: %s", parent_cdev)
1025 for disk in new_cdevs:
1026 rpath = disk.StaticDevPath()
1028 bd = _RecursiveFindBD(disk)
1030 logging.error("Can't find dynamic device %s while removing children",
1034 devs.append(bd.dev_path)
1037 parent_bdev.RemoveChildren(devs)
1041 def GetMirrorStatus(disks):
1042 """Get the mirroring status of a list of devices.
1045 disks: list of `objects.Disk`
1048 list of (mirror_done, estimated_time) tuples, which
1049 are the result of bdev.BlockDevice.CombinedSyncStatus()
1054 rbd = _RecursiveFindBD(dsk)
1056 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1057 stats.append(rbd.CombinedSyncStatus())
1061 def _RecursiveFindBD(disk, allow_partial=False):
1062 """Check if a device is activated.
1064 If so, return informations about the real device.
1067 disk: the objects.Disk instance
1068 allow_partial: don't abort the find if a child of the
1069 device can't be found; this is intended to be
1070 used when repairing mirrors
1073 None if the device can't be found
1074 otherwise the device instance
1079 for chdisk in disk.children:
1080 children.append(_RecursiveFindBD(chdisk))
1082 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1085 def FindBlockDevice(disk):
1086 """Check if a device is activated.
1088 If so, return informations about the real device.
1091 disk: the objects.Disk instance
1093 None if the device can't be found
1094 (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1097 rbd = _RecursiveFindBD(disk)
1100 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1103 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1104 """Write a file to the filesystem.
1106 This allows the master to overwrite(!) a file. It will only perform
1107 the operation if the file belongs to a list of configuration files.
1110 if not os.path.isabs(file_name):
1111 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1116 constants.CLUSTER_CONF_FILE,
1117 constants.ETC_HOSTS,
1118 constants.SSH_KNOWN_HOSTS_FILE,
1119 constants.VNC_PASSWORD_FILE,
1122 if file_name not in allowed_files:
1123 logging.error("Filename passed to UploadFile not in allowed"
1124 " upload targets: '%s'", file_name)
1127 utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1128 atime=atime, mtime=mtime)
1132 def _ErrnoOrStr(err):
1133 """Format an EnvironmentError exception.
1135 If the `err` argument has an errno attribute, it will be looked up
1136 and converted into a textual EXXXX description. Otherwise the string
1137 representation of the error will be returned.
1140 if hasattr(err, 'errno'):
1141 detail = errno.errorcode[err.errno]
1147 def _OSOndiskVersion(name, os_dir):
1148 """Compute and return the API version of a given OS.
1150 This function will try to read the API version of the os given by
1151 the 'name' parameter and residing in the 'os_dir' directory.
1153 Return value will be either an integer denoting the version or None in the
1154 case when this is not a valid OS name.
1157 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1160 st = os.stat(api_file)
1161 except EnvironmentError, err:
1162 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1163 " found (%s)" % _ErrnoOrStr(err))
1165 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1166 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1172 api_versions = f.readlines()
1175 except EnvironmentError, err:
1176 raise errors.InvalidOS(name, os_dir, "error while reading the"
1177 " API version (%s)" % _ErrnoOrStr(err))
1179 api_versions = [version.strip() for version in api_versions]
1181 api_versions = [int(version) for version in api_versions]
1182 except (TypeError, ValueError), err:
1183 raise errors.InvalidOS(name, os_dir,
1184 "API version is not integer (%s)" % str(err))
1189 def DiagnoseOS(top_dirs=None):
1190 """Compute the validity for all OSes.
1192 Returns an OS object for each name in all the given top directories
1193 (if not given defaults to constants.OS_SEARCH_PATH)
1199 if top_dirs is None:
1200 top_dirs = constants.OS_SEARCH_PATH
1203 for dir_name in top_dirs:
1204 if os.path.isdir(dir_name):
1206 f_names = utils.ListVisibleFiles(dir_name)
1207 except EnvironmentError, err:
1208 logging.exception("Can't list the OS directory %s", dir_name)
1210 for name in f_names:
1212 os_inst = OSFromDisk(name, base_dir=dir_name)
1213 result.append(os_inst)
1214 except errors.InvalidOS, err:
1215 result.append(objects.OS.FromInvalidOS(err))
1220 def OSFromDisk(name, base_dir=None):
1221 """Create an OS instance from disk.
1223 This function will return an OS instance if the given name is a
1224 valid OS name. Otherwise, it will raise an appropriate
1225 `errors.InvalidOS` exception, detailing why this is not a valid
1229 os_dir: Directory containing the OS scripts. Defaults to a search
1230 in all the OS_SEARCH_PATH directories.
1234 if base_dir is None:
1235 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1237 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1239 os_dir = os.path.sep.join([base_dir, name])
1241 api_versions = _OSOndiskVersion(name, os_dir)
1243 if constants.OS_API_VERSION not in api_versions:
1244 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1245 " (found %s want %s)"
1246 % (api_versions, constants.OS_API_VERSION))
1248 # OS Scripts dictionary, we will populate it with the actual script names
1249 os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1251 for script in os_scripts:
1252 os_scripts[script] = os.path.sep.join([os_dir, script])
1255 st = os.stat(os_scripts[script])
1256 except EnvironmentError, err:
1257 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1258 (script, _ErrnoOrStr(err)))
1260 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1261 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1264 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1265 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1269 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1270 create_script=os_scripts['create'],
1271 export_script=os_scripts['export'],
1272 import_script=os_scripts['import'],
1273 rename_script=os_scripts['rename'],
1274 api_versions=api_versions)
1277 def GrowBlockDevice(disk, amount):
1278 """Grow a stack of block devices.
1280 This function is called recursively, with the childrens being the
1284 disk: the disk to be grown
1286 Returns: a tuple of (status, result), with:
1287 status: the result (true/false) of the operation
1288 result: the error message if the operation failed, otherwise not used
1291 r_dev = _RecursiveFindBD(disk)
1293 return False, "Cannot find block device %s" % (disk,)
1297 except errors.BlockDeviceError, err:
1298 return False, str(err)
1303 def SnapshotBlockDevice(disk):
1304 """Create a snapshot copy of a block device.
1306 This function is called recursively, and the snapshot is actually created
1307 just for the leaf lvm backend device.
1310 disk: the disk to be snapshotted
1313 a config entry for the actual lvm device snapshotted.
1317 if len(disk.children) == 1:
1318 # only one child, let's recurse on it
1319 return SnapshotBlockDevice(disk.children[0])
1321 # more than one child, choose one that matches
1322 for child in disk.children:
1323 if child.size == disk.size:
1324 # return implies breaking the loop
1325 return SnapshotBlockDevice(child)
1326 elif disk.dev_type == constants.LD_LV:
1327 r_dev = _RecursiveFindBD(disk)
1328 if r_dev is not None:
1329 # let's stay on the safe side and ask for the full size, for now
1330 return r_dev.Snapshot(disk.size)
1334 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1335 " '%s' of type '%s'" %
1336 (disk.unique_id, disk.dev_type))
1339 def ExportSnapshot(disk, dest_node, instance, cluster_name):
1340 """Export a block device snapshot to a remote node.
1343 disk: the snapshot block device
1344 dest_node: the node to send the image to
1345 instance: instance being exported
1348 True if successful, False otherwise.
1351 inst_os = OSFromDisk(instance.os)
1352 export_script = inst_os.export_script
1354 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1355 instance.name, int(time.time()))
1356 if not os.path.exists(constants.LOG_OS_DIR):
1357 os.mkdir(constants.LOG_OS_DIR, 0750)
1359 real_os_dev = _RecursiveFindBD(disk)
1360 if real_os_dev is None:
1361 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1365 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1366 destfile = disk.physical_id[1]
1368 # the target command is built out of three individual commands,
1369 # which are joined by pipes; we check each individual command for
1372 expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1373 export_script, instance.name,
1374 real_os_dev.dev_path, logfile)
1378 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1379 destdir, destdir, destfile)
1380 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1381 constants.GANETI_RUNAS,
1384 # all commands have been checked, so we're safe to combine them
1385 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1387 result = utils.RunCmd(command)
1390 logging.error("os snapshot export command '%s' returned error: %s"
1391 " output: %s", command, result.fail_reason, result.output)
1397 def FinalizeExport(instance, snap_disks):
1398 """Write out the export configuration information.
1401 instance: instance configuration
1402 snap_disks: snapshot block devices
1405 False in case of error, True otherwise.
1408 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1409 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1411 config = objects.SerializableConfigParser()
1413 config.add_section(constants.INISECT_EXP)
1414 config.set(constants.INISECT_EXP, 'version', '0')
1415 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1416 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1417 config.set(constants.INISECT_EXP, 'os', instance.os)
1418 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1420 config.add_section(constants.INISECT_INS)
1421 config.set(constants.INISECT_INS, 'name', instance.name)
1422 config.set(constants.INISECT_INS, 'memory', '%d' %
1423 instance.beparams[constants.BE_MEMORY])
1424 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1425 instance.beparams[constants.BE_VCPUS])
1426 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1429 for nic_count, nic in enumerate(instance.nics):
1430 config.set(constants.INISECT_INS, 'nic%d_mac' %
1431 nic_count, '%s' % nic.mac)
1432 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1433 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1435 # TODO: redundant: on load can read nics until it doesn't exist
1436 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1439 for disk_count, disk in enumerate(snap_disks):
1440 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1441 ('%s' % disk.iv_name))
1442 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1443 ('%s' % disk.physical_id[1]))
1444 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1446 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1448 cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1449 cfo = open(cff, 'w')
1455 shutil.rmtree(finaldestdir, True)
1456 shutil.move(destdir, finaldestdir)
1461 def ExportInfo(dest):
1462 """Get export configuration information.
1465 dest: directory containing the export
1468 A serializable config file containing the export info.
1471 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1473 config = objects.SerializableConfigParser()
1476 if (not config.has_section(constants.INISECT_EXP) or
1477 not config.has_section(constants.INISECT_INS)):
1483 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1485 """Import an os image into an instance.
1488 instance: the instance object
1489 os_disk: the instance-visible name of the os device
1490 swap_disk: the instance-visible name of the swap device
1491 src_node: node holding the source image
1492 src_image: path to the source image on src_node
1495 False in case of error, True otherwise.
1498 inst_os = OSFromDisk(instance.os)
1499 import_script = inst_os.import_script
1501 os_device = instance.FindDisk(os_disk)
1502 if os_device is None:
1503 logging.error("Can't find this device-visible name '%s'", os_disk)
1506 swap_device = instance.FindDisk(swap_disk)
1507 if swap_device is None:
1508 logging.error("Can't find this device-visible name '%s'", swap_disk)
1511 real_os_dev = _RecursiveFindBD(os_device)
1512 if real_os_dev is None:
1513 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1517 real_swap_dev = _RecursiveFindBD(swap_device)
1518 if real_swap_dev is None:
1519 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1521 real_swap_dev.Open()
1523 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1524 instance.name, int(time.time()))
1525 if not os.path.exists(constants.LOG_OS_DIR):
1526 os.mkdir(constants.LOG_OS_DIR, 0750)
1528 destcmd = utils.BuildShellCmd('cat %s', src_image)
1529 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1530 constants.GANETI_RUNAS,
1534 impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1535 inst_os.path, import_script, instance.name,
1536 real_os_dev.dev_path, real_swap_dev.dev_path,
1539 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1540 env = {'HYPERVISOR': instance.hypervisor}
1542 result = utils.RunCmd(command, env=env)
1545 logging.error("os import command '%s' returned error: %s"
1546 " output: %s", command, result.fail_reason, result.output)
1553 """Return a list of exports currently available on this machine.
1556 if os.path.isdir(constants.EXPORT_DIR):
1557 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1562 def RemoveExport(export):
1563 """Remove an existing export from the node.
1566 export: the name of the export to remove
1569 False in case of error, True otherwise.
1572 target = os.path.join(constants.EXPORT_DIR, export)
1574 shutil.rmtree(target)
1575 # TODO: catch some of the relevant exceptions and provide a pretty
1576 # error message if rmtree fails.
1581 def RenameBlockDevices(devlist):
1582 """Rename a list of block devices.
1584 The devlist argument is a list of tuples (disk, new_logical,
1585 new_physical). The return value will be a combined boolean result
1586 (True only if all renames succeeded).
1590 for disk, unique_id in devlist:
1591 dev = _RecursiveFindBD(disk)
1596 old_rpath = dev.dev_path
1597 dev.Rename(unique_id)
1598 new_rpath = dev.dev_path
1599 if old_rpath != new_rpath:
1600 DevCacheManager.RemoveCache(old_rpath)
1601 # FIXME: we should add the new cache information here, like:
1602 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1603 # but we don't have the owner here - maybe parse from existing
1604 # cache? for now, we only lose lvm data when we rename, which
1605 # is less critical than DRBD or MD
1606 except errors.BlockDeviceError, err:
1607 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1612 def _TransformFileStorageDir(file_storage_dir):
1613 """Checks whether given file_storage_dir is valid.
1615 Checks wheter the given file_storage_dir is within the cluster-wide
1616 default file_storage_dir stored in SimpleStore. Only paths under that
1617 directory are allowed.
1620 file_storage_dir: string with path
1623 normalized file_storage_dir (string) if valid, None otherwise
1627 file_storage_dir = os.path.normpath(file_storage_dir)
1628 base_file_storage_dir = cfg.GetFileStorageDir()
1629 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1630 base_file_storage_dir):
1631 logging.error("file storage directory '%s' is not under base file"
1632 " storage directory '%s'",
1633 file_storage_dir, base_file_storage_dir)
1635 return file_storage_dir
1638 def CreateFileStorageDir(file_storage_dir):
1639 """Create file storage directory.
1642 file_storage_dir: string containing the path
1645 tuple with first element a boolean indicating wheter dir
1646 creation was successful or not
1649 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1651 if not file_storage_dir:
1654 if os.path.exists(file_storage_dir):
1655 if not os.path.isdir(file_storage_dir):
1656 logging.error("'%s' is not a directory", file_storage_dir)
1660 os.makedirs(file_storage_dir, 0750)
1661 except OSError, err:
1662 logging.error("Cannot create file storage directory '%s': %s",
1663 file_storage_dir, err)
1668 def RemoveFileStorageDir(file_storage_dir):
1669 """Remove file storage directory.
1671 Remove it only if it's empty. If not log an error and return.
1674 file_storage_dir: string containing the path
1677 tuple with first element a boolean indicating wheter dir
1678 removal was successful or not
1681 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1683 if not file_storage_dir:
1686 if os.path.exists(file_storage_dir):
1687 if not os.path.isdir(file_storage_dir):
1688 logging.error("'%s' is not a directory", file_storage_dir)
1690 # deletes dir only if empty, otherwise we want to return False
1692 os.rmdir(file_storage_dir)
1693 except OSError, err:
1694 logging.exception("Cannot remove file storage directory '%s'",
1700 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1701 """Rename the file storage directory.
1704 old_file_storage_dir: string containing the old path
1705 new_file_storage_dir: string containing the new path
1708 tuple with first element a boolean indicating wheter dir
1709 rename was successful or not
1712 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1713 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1715 if not old_file_storage_dir or not new_file_storage_dir:
1718 if not os.path.exists(new_file_storage_dir):
1719 if os.path.isdir(old_file_storage_dir):
1721 os.rename(old_file_storage_dir, new_file_storage_dir)
1722 except OSError, err:
1723 logging.exception("Cannot rename '%s' to '%s'",
1724 old_file_storage_dir, new_file_storage_dir)
1727 logging.error("'%s' is not a directory", old_file_storage_dir)
1730 if os.path.exists(old_file_storage_dir):
1731 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1732 old_file_storage_dir, new_file_storage_dir)
1737 def _IsJobQueueFile(file_name):
1738 """Checks whether the given filename is in the queue directory.
1741 queue_dir = os.path.normpath(constants.QUEUE_DIR)
1742 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1745 logging.error("'%s' is not a file in the queue directory",
1751 def JobQueueUpdate(file_name, content):
1752 """Updates a file in the queue directory.
1755 if not _IsJobQueueFile(file_name):
1758 # Write and replace the file atomically
1759 utils.WriteFile(file_name, data=content)
1764 def JobQueueRename(old, new):
1765 """Renames a job queue file.
1768 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1776 def JobQueueSetDrainFlag(drain_flag):
1777 """Set the drain flag for the queue.
1779 This will set or unset the queue drain flag.
1781 @type drain_flag: bool
1782 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1786 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1788 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1793 def CloseBlockDevices(disks):
1794 """Closes the given block devices.
1796 This means they will be switched to secondary mode (in case of DRBD).
1801 rd = _RecursiveFindBD(cf)
1803 return (False, "Can't find device %s" % cf)
1810 except errors.BlockDeviceError, err:
1811 msg.append(str(err))
1813 return (False, "Can't make devices secondary: %s" % ",".join(msg))
1815 return (True, "All devices secondary")
1818 def ValidateHVParams(hvname, hvparams):
1819 """Validates the given hypervisor parameters.
1821 @type hvname: string
1822 @param hvname: the hypervisor name
1823 @type hvparams: dict
1824 @param hvparams: the hypervisor parameters to be validated
1825 @rtype: tuple (bool, str)
1826 @return: tuple of (success, message)
1830 hv_type = hypervisor.GetHypervisor(hvname)
1831 hv_type.ValidateParameters(hvparams)
1832 return (True, "Validation passed")
1833 except errors.HypervisorError, err:
1834 return (False, str(err))
1837 class HooksRunner(object):
1840 This class is instantiated on the node side (ganeti-noded) and not on
1844 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1846 def __init__(self, hooks_base_dir=None):
1847 """Constructor for hooks runner.
1850 - hooks_base_dir: if not None, this overrides the
1851 constants.HOOKS_BASE_DIR (useful for unittests)
1854 if hooks_base_dir is None:
1855 hooks_base_dir = constants.HOOKS_BASE_DIR
1856 self._BASE_DIR = hooks_base_dir
1859 def ExecHook(script, env):
1860 """Exec one hook script.
1863 - script: the full path to the script
1864 - env: the environment with which to exec the script
1867 # exec the process using subprocess and log the output
1870 fdstdin = open("/dev/null", "r")
1871 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1872 stderr=subprocess.STDOUT, close_fds=True,
1873 shell=False, cwd="/", env=env)
1876 output = child.stdout.read(4096)
1877 child.stdout.close()
1878 except EnvironmentError, err:
1879 output += "Hook script error: %s" % str(err)
1883 result = child.wait()
1885 except EnvironmentError, err:
1886 if err.errno == errno.EINTR:
1890 # try not to leak fds
1891 for fd in (fdstdin, ):
1895 except EnvironmentError, err:
1896 # just log the error
1897 #logging.exception("Error while closing fd %s", fd)
1900 return result == 0, output
1902 def RunHooks(self, hpath, phase, env):
1903 """Run the scripts in the hooks directory.
1905 This method will not be usually overriden by child opcodes.
1908 if phase == constants.HOOKS_PHASE_PRE:
1910 elif phase == constants.HOOKS_PHASE_POST:
1913 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1916 subdir = "%s-%s.d" % (hpath, suffix)
1917 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1919 dir_contents = utils.ListVisibleFiles(dir_name)
1920 except OSError, err:
1924 # we use the standard python sort order,
1925 # so 00name is the recommended naming scheme
1927 for relname in dir_contents:
1928 fname = os.path.join(dir_name, relname)
1929 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1930 self.RE_MASK.match(relname) is not None):
1931 rrval = constants.HKR_SKIP
1934 result, output = self.ExecHook(fname, env)
1936 rrval = constants.HKR_FAIL
1938 rrval = constants.HKR_SUCCESS
1939 rr.append(("%s/%s" % (subdir, relname), rrval, output))
1944 class IAllocatorRunner(object):
1945 """IAllocator runner.
1947 This class is instantiated on the node side (ganeti-noded) and not on
1951 def Run(self, name, idata):
1952 """Run an iallocator script.
1954 Return value: tuple of:
1955 - run status (one of the IARUN_ constants)
1958 - fail reason (as from utils.RunResult)
1961 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1963 if alloc_script is None:
1964 return (constants.IARUN_NOTFOUND, None, None, None)
1966 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1970 result = utils.RunCmd([alloc_script, fin_name])
1972 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1977 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1980 class DevCacheManager(object):
1981 """Simple class for managing a cache of block device information.
1984 _DEV_PREFIX = "/dev/"
1985 _ROOT_DIR = constants.BDEV_CACHE_DIR
1988 def _ConvertPath(cls, dev_path):
1989 """Converts a /dev/name path to the cache file name.
1991 This replaces slashes with underscores and strips the /dev
1992 prefix. It then returns the full path to the cache file
1995 if dev_path.startswith(cls._DEV_PREFIX):
1996 dev_path = dev_path[len(cls._DEV_PREFIX):]
1997 dev_path = dev_path.replace("/", "_")
1998 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2002 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2003 """Updates the cache information for a given device.
2006 if dev_path is None:
2007 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2009 fpath = cls._ConvertPath(dev_path)
2015 iv_name = "not_visible"
2016 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2018 utils.WriteFile(fpath, data=fdata)
2019 except EnvironmentError, err:
2020 logging.exception("Can't update bdev cache for %s", dev_path)
2023 def RemoveCache(cls, dev_path):
2024 """Remove data for a dev_path.
2027 if dev_path is None:
2028 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2030 fpath = cls._ConvertPath(dev_path)
2032 utils.RemoveFile(fpath)
2033 except EnvironmentError, err:
2034 logging.exception("Can't update bdev cache for %s", dev_path)