4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
48 return ssconf.SimpleConfigReader()
51 def _GetSshRunner(cluster_name):
52 return ssh.SshRunner(cluster_name)
55 def _CleanDirectory(path, exclude=[]):
56 """Removes all regular files in a directory.
58 @param exclude: List of files to be excluded.
62 if not os.path.isdir(path):
65 # Normalize excluded paths
66 exclude = [os.path.normpath(i) for i in exclude]
68 for rel_name in utils.ListVisibleFiles(path):
69 full_name = os.path.normpath(os.path.join(path, rel_name))
70 if full_name in exclude:
72 if os.path.isfile(full_name) and not os.path.islink(full_name):
73 utils.RemoveFile(full_name)
77 """Removes job queue files and archived jobs
80 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
85 """Returns master information.
87 This is an utility function to compute master information, either
88 for consumption here or from the node daemon.
91 @return: (master_netdev, master_ip, master_name)
96 master_netdev = cfg.GetMasterNetdev()
97 master_ip = cfg.GetMasterIP()
98 master_node = cfg.GetMasterNode()
99 except errors.ConfigurationError, err:
100 logging.exception("Cluster configuration incomplete")
102 return (master_netdev, master_ip, master_node)
105 def StartMaster(start_daemons):
106 """Activate local node as master node.
108 The function will always try activate the IP address of the master
109 (if someone else has it, then it won't). Then, if the start_daemons
110 parameter is True, it will also start the master daemons
111 (ganet-masterd and ganeti-rapi).
115 master_netdev, master_ip, _ = GetMasterInfo()
116 if not master_netdev:
119 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120 if utils.OwnIpAddress(master_ip):
121 # we already have the ip:
122 logging.debug("Already started")
124 logging.error("Someone else has the master ip, not activating")
127 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128 "dev", master_netdev, "label",
129 "%s:0" % master_netdev])
131 logging.error("Can't activate master IP: %s", result.output)
134 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135 "-s", master_ip, master_ip])
136 # we'll ignore the exit code of arping
138 # and now start the master and rapi daemons
140 for daemon in 'ganeti-masterd', 'ganeti-rapi':
141 result = utils.RunCmd([daemon])
143 logging.error("Can't start daemon %s: %s", daemon, result.output)
148 def StopMaster(stop_daemons):
149 """Deactivate this node as master.
151 The function will always try to deactivate the IP address of the
152 master. Then, if the stop_daemons parameter is True, it will also
153 stop the master daemons (ganet-masterd and ganeti-rapi).
156 master_netdev, master_ip, _ = GetMasterInfo()
157 if not master_netdev:
160 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161 "dev", master_netdev])
163 logging.error("Can't remove the master IP, error: %s", result.output)
164 # but otherwise ignore the failure
167 # stop/kill the rapi and the master daemon
168 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175 """Joins this node to the cluster.
177 This does the following:
178 - updates the hostkeys of the machine (rsa and dsa)
179 - adds the ssh private key to the user
180 - adds the ssh public key to the users' authorized_keys file
183 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187 for name, content, mode in sshd_keys:
188 utils.WriteFile(name, data=content, mode=mode)
191 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193 except errors.OpExecError, err:
194 logging.exception("Error while processing user ssh files")
197 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198 utils.WriteFile(name, data=content, mode=0600)
200 utils.AddAuthorizedKey(auth_keys, sshpub)
202 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
208 """Cleans up the current node and prepares it to be removed from the cluster.
211 _CleanDirectory(constants.DATA_DIR)
215 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216 except errors.OpExecError:
217 logging.exception("Error while processing ssh files")
220 f = open(pub_key, 'r')
222 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
226 utils.RemoveFile(priv_key)
227 utils.RemoveFile(pub_key)
229 # Return a reassuring string to the caller, and quit
230 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
233 def GetNodeInfo(vgname, hypervisor_type):
234 """Gives back a hash with different informations about the node.
236 @type vgname: C{string}
237 @param vgname: the name of the volume group to ask for disk space information
238 @type hypervisor_type: C{str}
239 @param hypervisor_type: the name of the hypervisor to ask for
242 @return: dictionary with the following keys:
243 - vg_size is the size of the configured volume group in MiB
244 - vg_free is the free size of the volume group in MiB
245 - memory_dom0 is the memory allocated for domain0 in MiB
246 - memory_free is the currently available (free) ram in MiB
247 - memory_total is the total number of ram in MiB
251 vginfo = _GetVGInfo(vgname)
252 outputarray['vg_size'] = vginfo['vg_size']
253 outputarray['vg_free'] = vginfo['vg_free']
255 hyper = hypervisor.GetHypervisor(hypervisor_type)
256 hyp_info = hyper.GetNodeInfo()
257 if hyp_info is not None:
258 outputarray.update(hyp_info)
260 f = open("/proc/sys/kernel/random/boot_id", 'r')
262 outputarray["bootid"] = f.read(128).rstrip("\n")
269 def VerifyNode(what, cluster_name):
270 """Verify the status of the local node.
272 Based on the input L{what} parameter, various checks are done on the
275 If the I{filelist} key is present, this list of
276 files is checksummed and the file/checksum pairs are returned.
278 If the I{nodelist} key is present, we check that we have
279 connectivity via ssh with the target nodes (and check the hostname
282 If the I{node-net-test} key is present, we check that we have
283 connectivity to the given nodes via both primary IP and, if
284 applicable, secondary IPs.
287 @param what: a dictionary of things to check:
288 - filelist: list of files for which to compute checksums
289 - nodelist: list of nodes we should check ssh communication with
290 - node-net-test: list of nodes we should check node daemon port
292 - hypervisor: list with hypervisors to run the verify for
297 if 'hypervisor' in what:
298 result['hypervisor'] = my_dict = {}
299 for hv_name in what['hypervisor']:
300 my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
302 if 'filelist' in what:
303 result['filelist'] = utils.FingerprintFiles(what['filelist'])
305 if 'nodelist' in what:
306 result['nodelist'] = {}
307 random.shuffle(what['nodelist'])
308 for node in what['nodelist']:
309 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
311 result['nodelist'][node] = message
312 if 'node-net-test' in what:
313 result['node-net-test'] = {}
314 my_name = utils.HostInfo().name
315 my_pip = my_sip = None
316 for name, pip, sip in what['node-net-test']:
322 result['node-net-test'][my_name] = ("Can't find my own"
323 " primary/secondary IP"
326 port = utils.GetNodeDaemonPort()
327 for name, pip, sip in what['node-net-test']:
329 if not utils.TcpPing(pip, port, source=my_pip):
330 fail.append("primary")
332 if not utils.TcpPing(sip, port, source=my_sip):
333 fail.append("secondary")
335 result['node-net-test'][name] = ("failure using the %s"
342 def GetVolumeList(vg_name):
343 """Compute list of logical volumes and their size.
346 dictionary of all partions (key) with their size (in MiB), inactive
348 {'test1': ('20.06', True, True)}
353 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354 "--separator=%s" % sep,
355 "-olv_name,lv_size,lv_attr", vg_name])
357 logging.error("Failed to list logical volumes, lvs output: %s",
361 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362 for line in result.stdout.splitlines():
364 match = valid_line_re.match(line)
366 logging.error("Invalid line returned from lvs output: '%s'", line)
368 name, size, attr = match.groups()
369 inactive = attr[4] == '-'
370 online = attr[5] == 'o'
371 lvs[name] = (size, inactive, online)
376 def ListVolumeGroups():
377 """List the volume groups and their size.
380 Dictionary with keys volume name and values the size of the volume
383 return utils.ListVolumeGroups()
387 """List all volumes on this node.
390 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
392 "--options=lv_name,lv_size,devices,vg_name"])
394 logging.error("Failed to list logical volumes, lvs output: %s",
400 return dev.split('(')[0]
406 'name': line[0].strip(),
407 'size': line[1].strip(),
408 'dev': parse_dev(line[2].strip()),
409 'vg': line[3].strip(),
412 return [map_line(line.split('|')) for line in result.stdout.splitlines()
413 if line.count('|') >= 3]
416 def BridgesExist(bridges_list):
417 """Check if a list of bridges exist on the current node.
420 True if all of them exist, false otherwise
423 for bridge in bridges_list:
424 if not utils.BridgeExists(bridge):
430 def GetInstanceList(hypervisor_list):
431 """Provides a list of instances.
433 @type hypervisor_list: list
434 @param hypervisor_list: the list of hypervisors to query information
437 @return: a list of all running instances on the current node
438 - instance1.example.com
439 - instance2.example.com
443 for hname in hypervisor_list:
445 names = hypervisor.GetHypervisor(hname).ListInstances()
446 results.extend(names)
447 except errors.HypervisorError, err:
448 logging.exception("Error enumerating instances for hypevisor %s", hname)
449 # FIXME: should we somehow not propagate this to the master?
455 def GetInstanceInfo(instance, hname):
456 """Gives back the informations about an instance as a dictionary.
458 @type instance: string
459 @param instance: the instance name
461 @param hname: the hypervisor type of the instance
464 @return: dictionary with the following keys:
465 - memory: memory size of instance (int)
466 - state: xen state of instance (string)
467 - time: cpu time of instance (float)
472 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473 if iinfo is not None:
474 output['memory'] = iinfo[2]
475 output['state'] = iinfo[4]
476 output['time'] = iinfo[5]
481 def GetAllInstancesInfo(hypervisor_list):
482 """Gather data about all instances.
484 This is the equivalent of `GetInstanceInfo()`, except that it
485 computes data for all instances at once, thus being faster if one
486 needs data about more than one instance.
488 @type hypervisor_list: list
489 @param hypervisor_list: list of hypervisors to query for instance data
491 @rtype: dict of dicts
492 @return: dictionary of instance: data, with data having the following keys:
493 - memory: memory size of instance (int)
494 - state: xen state of instance (string)
495 - time: cpu time of instance (float)
496 - vcpuus: the number of vcpus
501 for hname in hypervisor_list:
502 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
504 for name, inst_id, memory, vcpus, state, times in iinfo:
511 if name in output and output[name] != value:
512 raise errors.HypervisorError("Instance %s running duplicate"
513 " with different parameters" % name)
519 def AddOSToInstance(instance):
520 """Add an OS to an instance.
522 @type instance: L{objects.Instance}
523 @param instance: Instance whose OS is to be installed
526 inst_os = OSFromDisk(instance.os)
528 create_script = inst_os.create_script
529 create_env = OSEnvironment(instance)
531 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
532 instance.name, int(time.time()))
533 if not os.path.exists(constants.LOG_OS_DIR):
534 os.mkdir(constants.LOG_OS_DIR, 0750)
536 command = utils.BuildShellCmd("cd %s && %s &>%s",
537 inst_os.path, create_script, logfile)
539 result = utils.RunCmd(command, env=create_env)
541 logging.error("os create command '%s' returned error: %s, logfile: %s,"
542 " output: %s", command, result.fail_reason, logfile,
549 def RunRenameInstance(instance, old_name):
550 """Run the OS rename script for an instance.
552 @type instance: objects.Instance
553 @param instance: Instance whose OS is to be installed
554 @type old_name: string
555 @param old_name: previous instance name
558 inst_os = OSFromDisk(instance.os)
560 script = inst_os.rename_script
561 rename_env = OSEnvironment(instance)
562 rename_env['OLD_INSTANCE_NAME'] = old_name
564 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
566 instance.name, int(time.time()))
567 if not os.path.exists(constants.LOG_OS_DIR):
568 os.mkdir(constants.LOG_OS_DIR, 0750)
570 command = utils.BuildShellCmd("cd %s && %s &>%s",
571 inst_os.path, script, logfile)
573 result = utils.RunCmd(command, env=rename_env)
576 logging.error("os create command '%s' returned error: %s output: %s",
577 command, result.fail_reason, result.output)
583 def _GetVGInfo(vg_name):
584 """Get informations about the volume group.
587 vg_name: the volume group
590 { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
592 vg_size is the total size of the volume group in MiB
593 vg_free is the free size of the volume group in MiB
594 pv_count are the number of physical disks in that vg
596 If an error occurs during gathering of data, we return the same dict
597 with keys all set to None.
600 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
602 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
603 "--nosuffix", "--units=m", "--separator=:", vg_name])
606 logging.error("volume group %s not present", vg_name)
608 valarr = retval.stdout.strip().rstrip(':').split(':')
612 "vg_size": int(round(float(valarr[0]), 0)),
613 "vg_free": int(round(float(valarr[1]), 0)),
614 "pv_count": int(valarr[2]),
616 except ValueError, err:
617 logging.exception("Fail to parse vgs output")
619 logging.error("vgs output has the wrong number of fields (expected"
620 " three): %s", str(valarr))
624 def _GatherBlockDevs(instance):
625 """Set up an instance's block device(s).
627 This is run on the primary node at instance startup. The block
628 devices must be already assembled.
632 for disk in instance.disks:
633 device = _RecursiveFindBD(disk)
635 raise errors.BlockDeviceError("Block device '%s' is not set up." %
638 block_devices.append((disk, device))
642 def StartInstance(instance, extra_args):
643 """Start an instance.
645 @type instance: instance object
646 @param instance: the instance object
648 @return: whether the startup was successful or not
651 running_instances = GetInstanceList([instance.hypervisor])
653 if instance.name in running_instances:
656 block_devices = _GatherBlockDevs(instance)
657 hyper = hypervisor.GetHypervisor(instance.hypervisor)
660 hyper.StartInstance(instance, block_devices, extra_args)
661 except errors.HypervisorError, err:
662 logging.exception("Failed to start instance")
668 def ShutdownInstance(instance):
669 """Shut an instance down.
671 @type instance: instance object
672 @param instance: the instance object
674 @return: whether the startup was successful or not
677 hv_name = instance.hypervisor
678 running_instances = GetInstanceList([hv_name])
680 if instance.name not in running_instances:
683 hyper = hypervisor.GetHypervisor(hv_name)
685 hyper.StopInstance(instance)
686 except errors.HypervisorError, err:
687 logging.error("Failed to stop instance")
690 # test every 10secs for 2min
694 for dummy in range(11):
695 if instance.name not in GetInstanceList([hv_name]):
699 # the shutdown did not succeed
700 logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
703 hyper.StopInstance(instance, force=True)
704 except errors.HypervisorError, err:
705 logging.exception("Failed to stop instance")
709 if instance.name in GetInstanceList([hv_name]):
710 logging.error("could not shutdown instance '%s' even by destroy",
717 def RebootInstance(instance, reboot_type, extra_args):
718 """Reboot an instance.
721 instance - name of instance to reboot
722 reboot_type - how to reboot [soft,hard,full]
725 running_instances = GetInstanceList([instance.hypervisor])
727 if instance.name not in running_instances:
728 logging.error("Cannot reboot instance that is not running")
731 hyper = hypervisor.GetHypervisor(instance.hypervisor)
732 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
734 hyper.RebootInstance(instance)
735 except errors.HypervisorError, err:
736 logging.exception("Failed to soft reboot instance")
738 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
740 ShutdownInstance(instance)
741 StartInstance(instance, extra_args)
742 except errors.HypervisorError, err:
743 logging.exception("Failed to hard reboot instance")
746 raise errors.ParameterError("reboot_type invalid")
751 def MigrateInstance(instance, target, live):
752 """Migrates an instance to another node.
754 @type instance: C{objects.Instance}
755 @param instance: the instance definition
757 @param target: the target node name
759 @param live: whether the migration should be done live or not (the
760 interpretation of this parameter is left to the hypervisor)
762 @return: a tuple of (success, msg) where:
763 - succes is a boolean denoting the success/failure of the operation
764 - msg is a string with details in case of failure
767 hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
770 hyper.MigrateInstance(instance.name, target, live)
771 except errors.HypervisorError, err:
772 msg = "Failed to migrate instance: %s" % str(err)
775 return (True, "Migration successfull")
778 def CreateBlockDevice(disk, size, owner, on_primary, info):
779 """Creates a block device for an instance.
782 disk: a ganeti.objects.Disk object
783 size: the size of the physical underlying device
784 owner: a string with the name of the instance
785 on_primary: a boolean indicating if it is the primary node or not
786 info: string that will be sent to the physical device creation
789 the new unique_id of the device (this can sometime be
790 computed only after creation), or None. On secondary nodes,
791 it's not required to return anything.
796 for child in disk.children:
797 crdev = _RecursiveAssembleBD(child, owner, on_primary)
798 if on_primary or disk.AssembleOnSecondary():
799 # we need the children open in case the device itself has to
804 device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
805 if device is not None:
806 logging.info("removing existing device %s", disk)
808 except errors.BlockDeviceError, err:
811 device = bdev.Create(disk.dev_type, disk.physical_id,
814 raise ValueError("Can't create child device for %s, %s" %
816 if on_primary or disk.AssembleOnSecondary():
817 if not device.Assemble():
818 errorstring = "Can't assemble device after creation"
819 logging.error(errorstring)
820 raise errors.BlockDeviceError("%s, very unusual event - check the node"
821 " daemon logs" % errorstring)
822 device.SetSyncSpeed(constants.SYNC_SPEED)
823 if on_primary or disk.OpenOnSecondary():
824 device.Open(force=True)
825 DevCacheManager.UpdateCache(device.dev_path, owner,
826 on_primary, disk.iv_name)
830 physical_id = device.unique_id
834 def RemoveBlockDevice(disk):
835 """Remove a block device.
837 This is intended to be called recursively.
841 # since we are removing the device, allow a partial match
842 # this allows removal of broken mirrors
843 rdev = _RecursiveFindBD(disk, allow_partial=True)
844 except errors.BlockDeviceError, err:
845 # probably can't attach
846 logging.info("Can't attach to device %s in remove", disk)
849 r_path = rdev.dev_path
850 result = rdev.Remove()
852 DevCacheManager.RemoveCache(r_path)
856 for child in disk.children:
857 result = result and RemoveBlockDevice(child)
861 def _RecursiveAssembleBD(disk, owner, as_primary):
862 """Activate a block device for an instance.
864 This is run on the primary and secondary nodes for an instance.
866 This function is called recursively.
869 disk: a objects.Disk object
870 as_primary: if we should make the block device read/write
873 the assembled device or None (in case no device was assembled)
875 If the assembly is not successful, an exception is raised.
880 mcn = disk.ChildrenNeeded()
882 mcn = 0 # max number of Nones allowed
884 mcn = len(disk.children) - mcn # max number of Nones
885 for chld_disk in disk.children:
887 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
888 except errors.BlockDeviceError, err:
889 if children.count(None) >= mcn:
892 logging.debug("Error in child activation: %s", str(err))
893 children.append(cdev)
895 if as_primary or disk.AssembleOnSecondary():
896 r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
897 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
899 if as_primary or disk.OpenOnSecondary():
901 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
902 as_primary, disk.iv_name)
909 def AssembleBlockDevice(disk, owner, as_primary):
910 """Activate a block device for an instance.
912 This is a wrapper over _RecursiveAssembleBD.
915 a /dev path for primary nodes
916 True for secondary nodes
919 result = _RecursiveAssembleBD(disk, owner, as_primary)
920 if isinstance(result, bdev.BlockDev):
921 result = result.dev_path
925 def ShutdownBlockDevice(disk):
926 """Shut down a block device.
928 First, if the device is assembled (can `Attach()`), then the device
929 is shutdown. Then the children of the device are shutdown.
931 This function is called recursively. Note that we don't cache the
932 children or such, as oppossed to assemble, shutdown of different
933 devices doesn't require that the upper device was active.
936 r_dev = _RecursiveFindBD(disk)
937 if r_dev is not None:
938 r_path = r_dev.dev_path
939 result = r_dev.Shutdown()
941 DevCacheManager.RemoveCache(r_path)
945 for child in disk.children:
946 result = result and ShutdownBlockDevice(child)
950 def MirrorAddChildren(parent_cdev, new_cdevs):
951 """Extend a mirrored block device.
954 parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
955 if parent_bdev is None:
956 logging.error("Can't find parent device")
958 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
959 if new_bdevs.count(None) > 0:
960 logging.error("Can't find new device(s) to add: %s:%s",
961 new_bdevs, new_cdevs)
963 parent_bdev.AddChildren(new_bdevs)
967 def MirrorRemoveChildren(parent_cdev, new_cdevs):
968 """Shrink a mirrored block device.
971 parent_bdev = _RecursiveFindBD(parent_cdev)
972 if parent_bdev is None:
973 logging.error("Can't find parent in remove children: %s", parent_cdev)
976 for disk in new_cdevs:
977 rpath = disk.StaticDevPath()
979 bd = _RecursiveFindBD(disk)
981 logging.error("Can't find dynamic device %s while removing children",
985 devs.append(bd.dev_path)
988 parent_bdev.RemoveChildren(devs)
992 def GetMirrorStatus(disks):
993 """Get the mirroring status of a list of devices.
996 disks: list of `objects.Disk`
999 list of (mirror_done, estimated_time) tuples, which
1000 are the result of bdev.BlockDevice.CombinedSyncStatus()
1005 rbd = _RecursiveFindBD(dsk)
1007 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1008 stats.append(rbd.CombinedSyncStatus())
1012 def _RecursiveFindBD(disk, allow_partial=False):
1013 """Check if a device is activated.
1015 If so, return informations about the real device.
1018 disk: the objects.Disk instance
1019 allow_partial: don't abort the find if a child of the
1020 device can't be found; this is intended to be
1021 used when repairing mirrors
1024 None if the device can't be found
1025 otherwise the device instance
1030 for chdisk in disk.children:
1031 children.append(_RecursiveFindBD(chdisk))
1033 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1036 def FindBlockDevice(disk):
1037 """Check if a device is activated.
1039 If so, return informations about the real device.
1042 disk: the objects.Disk instance
1044 None if the device can't be found
1045 (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1048 rbd = _RecursiveFindBD(disk)
1051 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1054 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1055 """Write a file to the filesystem.
1057 This allows the master to overwrite(!) a file. It will only perform
1058 the operation if the file belongs to a list of configuration files.
1061 if not os.path.isabs(file_name):
1062 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1067 constants.CLUSTER_CONF_FILE,
1068 constants.ETC_HOSTS,
1069 constants.SSH_KNOWN_HOSTS_FILE,
1070 constants.VNC_PASSWORD_FILE,
1073 if file_name not in allowed_files:
1074 logging.error("Filename passed to UploadFile not in allowed"
1075 " upload targets: '%s'", file_name)
1078 utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1079 atime=atime, mtime=mtime)
1083 def _ErrnoOrStr(err):
1084 """Format an EnvironmentError exception.
1086 If the `err` argument has an errno attribute, it will be looked up
1087 and converted into a textual EXXXX description. Otherwise the string
1088 representation of the error will be returned.
1091 if hasattr(err, 'errno'):
1092 detail = errno.errorcode[err.errno]
1098 def _OSOndiskVersion(name, os_dir):
1099 """Compute and return the API version of a given OS.
1101 This function will try to read the API version of the os given by
1102 the 'name' parameter and residing in the 'os_dir' directory.
1104 Return value will be either an integer denoting the version or None in the
1105 case when this is not a valid OS name.
1108 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1111 st = os.stat(api_file)
1112 except EnvironmentError, err:
1113 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1114 " found (%s)" % _ErrnoOrStr(err))
1116 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1117 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1123 api_versions = f.readlines()
1126 except EnvironmentError, err:
1127 raise errors.InvalidOS(name, os_dir, "error while reading the"
1128 " API version (%s)" % _ErrnoOrStr(err))
1130 api_versions = [version.strip() for version in api_versions]
1132 api_versions = [int(version) for version in api_versions]
1133 except (TypeError, ValueError), err:
1134 raise errors.InvalidOS(name, os_dir,
1135 "API version is not integer (%s)" % str(err))
1140 def DiagnoseOS(top_dirs=None):
1141 """Compute the validity for all OSes.
1143 Returns an OS object for each name in all the given top directories
1144 (if not given defaults to constants.OS_SEARCH_PATH)
1150 if top_dirs is None:
1151 top_dirs = constants.OS_SEARCH_PATH
1154 for dir_name in top_dirs:
1155 if os.path.isdir(dir_name):
1157 f_names = utils.ListVisibleFiles(dir_name)
1158 except EnvironmentError, err:
1159 logging.exception("Can't list the OS directory %s", dir_name)
1161 for name in f_names:
1163 os_inst = OSFromDisk(name, base_dir=dir_name)
1164 result.append(os_inst)
1165 except errors.InvalidOS, err:
1166 result.append(objects.OS.FromInvalidOS(err))
1171 def OSFromDisk(name, base_dir=None):
1172 """Create an OS instance from disk.
1174 This function will return an OS instance if the given name is a
1175 valid OS name. Otherwise, it will raise an appropriate
1176 `errors.InvalidOS` exception, detailing why this is not a valid
1179 @type base_dir: string
1180 @keyword base_dir: Base directory containing OS installations.
1181 Defaults to a search in all the OS_SEARCH_PATH dirs.
1185 if base_dir is None:
1186 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1188 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1190 os_dir = os.path.sep.join([base_dir, name])
1192 api_versions = _OSOndiskVersion(name, os_dir)
1194 if constants.OS_API_VERSION not in api_versions:
1195 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1196 " (found %s want %s)"
1197 % (api_versions, constants.OS_API_VERSION))
1199 # OS Scripts dictionary, we will populate it with the actual script names
1200 os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1202 for script in os_scripts:
1203 os_scripts[script] = os.path.sep.join([os_dir, script])
1206 st = os.stat(os_scripts[script])
1207 except EnvironmentError, err:
1208 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1209 (script, _ErrnoOrStr(err)))
1211 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1212 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1215 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1216 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1220 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1221 create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1222 export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1223 import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1224 rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1225 api_versions=api_versions)
1227 def OSEnvironment(instance, debug=0):
1228 """Calculate the environment for an os script.
1230 @type instance: instance object
1231 @param instance: target instance for the os script run
1232 @type debug: integer
1233 @param debug: debug level (0 or 1, for os api 10)
1235 @return: dict of environment variables
1239 result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1240 result['INSTANCE_NAME'] = instance.name
1241 result['HYPERVISOR'] = instance.hypervisor
1242 result['DISK_COUNT'] = '%d' % len(instance.disks)
1243 result['NIC_COUNT'] = '%d' % len(instance.nics)
1244 result['DEBUG_LEVEL'] = '%d' % debug
1245 for idx, disk in enumerate(instance.disks):
1246 real_disk = _RecursiveFindBD(disk)
1247 if real_disk is None:
1248 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1251 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1252 # FIXME: When disks will have read-only mode, populate this
1253 result['DISK_%d_ACCESS' % idx] = 'W'
1254 if constants.HV_DISK_TYPE in instance.hvparams:
1255 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1256 instance.hvparams[constants.HV_DISK_TYPE]
1257 if disk.dev_type in constants.LDS_BLOCK:
1258 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1259 elif disk.dev_type == constants.LD_FILE:
1260 result['DISK_%d_BACKEND_TYPE' % idx] = \
1261 'file:%s' % disk.physical_id[0]
1262 for idx, nic in enumerate(instance.nics):
1263 result['NIC_%d_MAC' % idx] = nic.mac
1265 result['NIC_%d_IP' % idx] = nic.ip
1266 result['NIC_%d_BRIDGE' % idx] = nic.bridge
1267 if constants.HV_NIC_TYPE in instance.hvparams:
1268 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1269 instance.hvparams[constants.HV_NIC_TYPE]
1273 def GrowBlockDevice(disk, amount):
1274 """Grow a stack of block devices.
1276 This function is called recursively, with the childrens being the
1280 disk: the disk to be grown
1282 Returns: a tuple of (status, result), with:
1283 status: the result (true/false) of the operation
1284 result: the error message if the operation failed, otherwise not used
1287 r_dev = _RecursiveFindBD(disk)
1289 return False, "Cannot find block device %s" % (disk,)
1293 except errors.BlockDeviceError, err:
1294 return False, str(err)
1299 def SnapshotBlockDevice(disk):
1300 """Create a snapshot copy of a block device.
1302 This function is called recursively, and the snapshot is actually created
1303 just for the leaf lvm backend device.
1305 @type disk: L{objects.Disk}
1306 @param disk: the disk to be snapshotted
1308 @return: snapshot disk path
1312 if len(disk.children) == 1:
1313 # only one child, let's recurse on it
1314 return SnapshotBlockDevice(disk.children[0])
1316 # more than one child, choose one that matches
1317 for child in disk.children:
1318 if child.size == disk.size:
1319 # return implies breaking the loop
1320 return SnapshotBlockDevice(child)
1321 elif disk.dev_type == constants.LD_LV:
1322 r_dev = _RecursiveFindBD(disk)
1323 if r_dev is not None:
1324 # let's stay on the safe side and ask for the full size, for now
1325 return r_dev.Snapshot(disk.size)
1329 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1330 " '%s' of type '%s'" %
1331 (disk.unique_id, disk.dev_type))
1334 def ExportSnapshot(disk, dest_node, instance, cluster_name):
1335 """Export a block device snapshot to a remote node.
1338 disk: the snapshot block device
1339 dest_node: the node to send the image to
1340 instance: instance being exported
1343 True if successful, False otherwise.
1346 export_env = OSEnvironment(instance)
1348 inst_os = OSFromDisk(instance.os)
1349 export_script = inst_os.export_script
1351 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1352 instance.name, int(time.time()))
1353 if not os.path.exists(constants.LOG_OS_DIR):
1354 os.mkdir(constants.LOG_OS_DIR, 0750)
1355 real_disk = _RecursiveFindBD(disk)
1356 if real_disk is None:
1357 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1361 export_env['EXPORT_DEVICE'] = real_disk.dev_path
1363 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1364 destfile = disk.physical_id[1]
1366 # the target command is built out of three individual commands,
1367 # which are joined by pipes; we check each individual command for
1369 expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1370 export_script, logfile)
1374 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1375 destdir, destdir, destfile)
1376 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1377 constants.GANETI_RUNAS,
1380 # all commands have been checked, so we're safe to combine them
1381 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1383 result = utils.RunCmd(command, env=export_env)
1386 logging.error("os snapshot export command '%s' returned error: %s"
1387 " output: %s", command, result.fail_reason, result.output)
1393 def FinalizeExport(instance, snap_disks):
1394 """Write out the export configuration information.
1397 instance: instance configuration
1398 snap_disks: snapshot block devices
1401 False in case of error, True otherwise.
1404 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1405 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1407 config = objects.SerializableConfigParser()
1409 config.add_section(constants.INISECT_EXP)
1410 config.set(constants.INISECT_EXP, 'version', '0')
1411 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1412 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1413 config.set(constants.INISECT_EXP, 'os', instance.os)
1414 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1416 config.add_section(constants.INISECT_INS)
1417 config.set(constants.INISECT_INS, 'name', instance.name)
1418 config.set(constants.INISECT_INS, 'memory', '%d' %
1419 instance.beparams[constants.BE_MEMORY])
1420 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1421 instance.beparams[constants.BE_VCPUS])
1422 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1425 for nic_count, nic in enumerate(instance.nics):
1426 config.set(constants.INISECT_INS, 'nic%d_mac' %
1427 nic_count, '%s' % nic.mac)
1428 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1429 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1431 # TODO: redundant: on load can read nics until it doesn't exist
1432 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1435 for disk_count, disk in enumerate(snap_disks):
1437 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1438 ('%s' % disk.iv_name))
1439 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1440 ('%s' % disk.physical_id[1]))
1441 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1443 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1445 cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1446 cfo = open(cff, 'w')
1452 shutil.rmtree(finaldestdir, True)
1453 shutil.move(destdir, finaldestdir)
1458 def ExportInfo(dest):
1459 """Get export configuration information.
1462 dest: directory containing the export
1465 A serializable config file containing the export info.
1468 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1470 config = objects.SerializableConfigParser()
1473 if (not config.has_section(constants.INISECT_EXP) or
1474 not config.has_section(constants.INISECT_INS)):
1480 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1481 """Import an os image into an instance.
1483 @type instance: L{objects.instance}
1484 @param instance: instance to import the disks into
1485 @type src_node: string
1486 @param src_node: source node for the disk images
1487 @type src_images: list of string
1488 @param src_images: absolute paths of the disk images
1489 @rtype: list of boolean
1490 @return: each boolean represent the success of importing the n-th disk
1493 import_env = OSEnvironment(instance)
1494 inst_os = OSFromDisk(instance.os)
1495 import_script = inst_os.import_script
1497 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1498 instance.name, int(time.time()))
1499 if not os.path.exists(constants.LOG_OS_DIR):
1500 os.mkdir(constants.LOG_OS_DIR, 0750)
1503 impcmd = utils.BuildShellCmd("(cd %s; %s &>%s)", inst_os.path, import_script,
1507 for idx, image in enumerate(src_images):
1509 destcmd = utils.BuildShellCmd('cat %s', image)
1510 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1511 constants.GANETI_RUNAS,
1513 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1514 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1515 result = utils.RunCmd(command, env=import_env)
1517 logging.error("disk import command '%s' returned error: %s"
1518 " output: %s", command, result.fail_reason, result.output)
1519 final_result.append(False)
1521 final_result.append(True)
1523 final_result.append(True)
1529 """Return a list of exports currently available on this machine.
1532 if os.path.isdir(constants.EXPORT_DIR):
1533 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1538 def RemoveExport(export):
1539 """Remove an existing export from the node.
1542 export: the name of the export to remove
1545 False in case of error, True otherwise.
1548 target = os.path.join(constants.EXPORT_DIR, export)
1550 shutil.rmtree(target)
1551 # TODO: catch some of the relevant exceptions and provide a pretty
1552 # error message if rmtree fails.
1557 def RenameBlockDevices(devlist):
1558 """Rename a list of block devices.
1560 The devlist argument is a list of tuples (disk, new_logical,
1561 new_physical). The return value will be a combined boolean result
1562 (True only if all renames succeeded).
1566 for disk, unique_id in devlist:
1567 dev = _RecursiveFindBD(disk)
1572 old_rpath = dev.dev_path
1573 dev.Rename(unique_id)
1574 new_rpath = dev.dev_path
1575 if old_rpath != new_rpath:
1576 DevCacheManager.RemoveCache(old_rpath)
1577 # FIXME: we should add the new cache information here, like:
1578 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1579 # but we don't have the owner here - maybe parse from existing
1580 # cache? for now, we only lose lvm data when we rename, which
1581 # is less critical than DRBD or MD
1582 except errors.BlockDeviceError, err:
1583 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1588 def _TransformFileStorageDir(file_storage_dir):
1589 """Checks whether given file_storage_dir is valid.
1591 Checks wheter the given file_storage_dir is within the cluster-wide
1592 default file_storage_dir stored in SimpleStore. Only paths under that
1593 directory are allowed.
1596 file_storage_dir: string with path
1599 normalized file_storage_dir (string) if valid, None otherwise
1603 file_storage_dir = os.path.normpath(file_storage_dir)
1604 base_file_storage_dir = cfg.GetFileStorageDir()
1605 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1606 base_file_storage_dir):
1607 logging.error("file storage directory '%s' is not under base file"
1608 " storage directory '%s'",
1609 file_storage_dir, base_file_storage_dir)
1611 return file_storage_dir
1614 def CreateFileStorageDir(file_storage_dir):
1615 """Create file storage directory.
1618 file_storage_dir: string containing the path
1621 tuple with first element a boolean indicating wheter dir
1622 creation was successful or not
1625 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1627 if not file_storage_dir:
1630 if os.path.exists(file_storage_dir):
1631 if not os.path.isdir(file_storage_dir):
1632 logging.error("'%s' is not a directory", file_storage_dir)
1636 os.makedirs(file_storage_dir, 0750)
1637 except OSError, err:
1638 logging.error("Cannot create file storage directory '%s': %s",
1639 file_storage_dir, err)
1644 def RemoveFileStorageDir(file_storage_dir):
1645 """Remove file storage directory.
1647 Remove it only if it's empty. If not log an error and return.
1650 file_storage_dir: string containing the path
1653 tuple with first element a boolean indicating wheter dir
1654 removal was successful or not
1657 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1659 if not file_storage_dir:
1662 if os.path.exists(file_storage_dir):
1663 if not os.path.isdir(file_storage_dir):
1664 logging.error("'%s' is not a directory", file_storage_dir)
1666 # deletes dir only if empty, otherwise we want to return False
1668 os.rmdir(file_storage_dir)
1669 except OSError, err:
1670 logging.exception("Cannot remove file storage directory '%s'",
1676 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1677 """Rename the file storage directory.
1680 old_file_storage_dir: string containing the old path
1681 new_file_storage_dir: string containing the new path
1684 tuple with first element a boolean indicating wheter dir
1685 rename was successful or not
1688 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1689 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1691 if not old_file_storage_dir or not new_file_storage_dir:
1694 if not os.path.exists(new_file_storage_dir):
1695 if os.path.isdir(old_file_storage_dir):
1697 os.rename(old_file_storage_dir, new_file_storage_dir)
1698 except OSError, err:
1699 logging.exception("Cannot rename '%s' to '%s'",
1700 old_file_storage_dir, new_file_storage_dir)
1703 logging.error("'%s' is not a directory", old_file_storage_dir)
1706 if os.path.exists(old_file_storage_dir):
1707 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1708 old_file_storage_dir, new_file_storage_dir)
1713 def _IsJobQueueFile(file_name):
1714 """Checks whether the given filename is in the queue directory.
1717 queue_dir = os.path.normpath(constants.QUEUE_DIR)
1718 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1721 logging.error("'%s' is not a file in the queue directory",
1727 def JobQueueUpdate(file_name, content):
1728 """Updates a file in the queue directory.
1731 if not _IsJobQueueFile(file_name):
1734 # Write and replace the file atomically
1735 utils.WriteFile(file_name, data=content)
1740 def JobQueueRename(old, new):
1741 """Renames a job queue file.
1744 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1752 def JobQueueSetDrainFlag(drain_flag):
1753 """Set the drain flag for the queue.
1755 This will set or unset the queue drain flag.
1757 @type drain_flag: bool
1758 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1762 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1764 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1769 def CloseBlockDevices(disks):
1770 """Closes the given block devices.
1772 This means they will be switched to secondary mode (in case of DRBD).
1777 rd = _RecursiveFindBD(cf)
1779 return (False, "Can't find device %s" % cf)
1786 except errors.BlockDeviceError, err:
1787 msg.append(str(err))
1789 return (False, "Can't make devices secondary: %s" % ",".join(msg))
1791 return (True, "All devices secondary")
1794 def ValidateHVParams(hvname, hvparams):
1795 """Validates the given hypervisor parameters.
1797 @type hvname: string
1798 @param hvname: the hypervisor name
1799 @type hvparams: dict
1800 @param hvparams: the hypervisor parameters to be validated
1801 @rtype: tuple (bool, str)
1802 @return: tuple of (success, message)
1806 hv_type = hypervisor.GetHypervisor(hvname)
1807 hv_type.ValidateParameters(hvparams)
1808 return (True, "Validation passed")
1809 except errors.HypervisorError, err:
1810 return (False, str(err))
1813 class HooksRunner(object):
1816 This class is instantiated on the node side (ganeti-noded) and not on
1820 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1822 def __init__(self, hooks_base_dir=None):
1823 """Constructor for hooks runner.
1826 - hooks_base_dir: if not None, this overrides the
1827 constants.HOOKS_BASE_DIR (useful for unittests)
1830 if hooks_base_dir is None:
1831 hooks_base_dir = constants.HOOKS_BASE_DIR
1832 self._BASE_DIR = hooks_base_dir
1835 def ExecHook(script, env):
1836 """Exec one hook script.
1839 - script: the full path to the script
1840 - env: the environment with which to exec the script
1843 # exec the process using subprocess and log the output
1846 fdstdin = open("/dev/null", "r")
1847 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1848 stderr=subprocess.STDOUT, close_fds=True,
1849 shell=False, cwd="/", env=env)
1852 output = child.stdout.read(4096)
1853 child.stdout.close()
1854 except EnvironmentError, err:
1855 output += "Hook script error: %s" % str(err)
1859 result = child.wait()
1861 except EnvironmentError, err:
1862 if err.errno == errno.EINTR:
1866 # try not to leak fds
1867 for fd in (fdstdin, ):
1871 except EnvironmentError, err:
1872 # just log the error
1873 #logging.exception("Error while closing fd %s", fd)
1876 return result == 0, output
1878 def RunHooks(self, hpath, phase, env):
1879 """Run the scripts in the hooks directory.
1881 This method will not be usually overriden by child opcodes.
1884 if phase == constants.HOOKS_PHASE_PRE:
1886 elif phase == constants.HOOKS_PHASE_POST:
1889 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1892 subdir = "%s-%s.d" % (hpath, suffix)
1893 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1895 dir_contents = utils.ListVisibleFiles(dir_name)
1896 except OSError, err:
1900 # we use the standard python sort order,
1901 # so 00name is the recommended naming scheme
1903 for relname in dir_contents:
1904 fname = os.path.join(dir_name, relname)
1905 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1906 self.RE_MASK.match(relname) is not None):
1907 rrval = constants.HKR_SKIP
1910 result, output = self.ExecHook(fname, env)
1912 rrval = constants.HKR_FAIL
1914 rrval = constants.HKR_SUCCESS
1915 rr.append(("%s/%s" % (subdir, relname), rrval, output))
1920 class IAllocatorRunner(object):
1921 """IAllocator runner.
1923 This class is instantiated on the node side (ganeti-noded) and not on
1927 def Run(self, name, idata):
1928 """Run an iallocator script.
1930 Return value: tuple of:
1931 - run status (one of the IARUN_ constants)
1934 - fail reason (as from utils.RunResult)
1937 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1939 if alloc_script is None:
1940 return (constants.IARUN_NOTFOUND, None, None, None)
1942 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1946 result = utils.RunCmd([alloc_script, fin_name])
1948 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1953 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1956 class DevCacheManager(object):
1957 """Simple class for managing a cache of block device information.
1960 _DEV_PREFIX = "/dev/"
1961 _ROOT_DIR = constants.BDEV_CACHE_DIR
1964 def _ConvertPath(cls, dev_path):
1965 """Converts a /dev/name path to the cache file name.
1967 This replaces slashes with underscores and strips the /dev
1968 prefix. It then returns the full path to the cache file
1971 if dev_path.startswith(cls._DEV_PREFIX):
1972 dev_path = dev_path[len(cls._DEV_PREFIX):]
1973 dev_path = dev_path.replace("/", "_")
1974 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1978 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1979 """Updates the cache information for a given device.
1982 if dev_path is None:
1983 logging.error("DevCacheManager.UpdateCache got a None dev_path")
1985 fpath = cls._ConvertPath(dev_path)
1991 iv_name = "not_visible"
1992 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1994 utils.WriteFile(fpath, data=fdata)
1995 except EnvironmentError, err:
1996 logging.exception("Can't update bdev cache for %s", dev_path)
1999 def RemoveCache(cls, dev_path):
2000 """Remove data for a dev_path.
2003 if dev_path is None:
2004 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2006 fpath = cls._ConvertPath(dev_path)
2008 utils.RemoveFile(fpath)
2009 except EnvironmentError, err:
2010 logging.exception("Can't update bdev cache for %s", dev_path)