4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
48 return ssconf.SimpleConfigReader()
51 def _GetSshRunner(cluster_name):
52 return ssh.SshRunner(cluster_name)
55 def _CleanDirectory(path, exclude=[]):
56 """Removes all regular files in a directory.
58 @param exclude: List of files to be excluded.
62 if not os.path.isdir(path):
65 # Normalize excluded paths
66 exclude = [os.path.normpath(i) for i in exclude]
68 for rel_name in utils.ListVisibleFiles(path):
69 full_name = os.path.normpath(os.path.join(path, rel_name))
70 if full_name in exclude:
72 if os.path.isfile(full_name) and not os.path.islink(full_name):
73 utils.RemoveFile(full_name)
77 """Removes job queue files and archived jobs
80 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
85 """Returns master information.
87 This is an utility function to compute master information, either
88 for consumption here or from the node daemon.
91 @return: (master_netdev, master_ip, master_name)
96 master_netdev = cfg.GetMasterNetdev()
97 master_ip = cfg.GetMasterIP()
98 master_node = cfg.GetMasterNode()
99 except errors.ConfigurationError, err:
100 logging.exception("Cluster configuration incomplete")
102 return (master_netdev, master_ip, master_node)
105 def StartMaster(start_daemons):
106 """Activate local node as master node.
108 The function will always try activate the IP address of the master
109 (if someone else has it, then it won't). Then, if the start_daemons
110 parameter is True, it will also start the master daemons
111 (ganet-masterd and ganeti-rapi).
115 master_netdev, master_ip, _ = GetMasterInfo()
116 if not master_netdev:
119 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120 if utils.OwnIpAddress(master_ip):
121 # we already have the ip:
122 logging.debug("Already started")
124 logging.error("Someone else has the master ip, not activating")
127 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128 "dev", master_netdev, "label",
129 "%s:0" % master_netdev])
131 logging.error("Can't activate master IP: %s", result.output)
134 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135 "-s", master_ip, master_ip])
136 # we'll ignore the exit code of arping
138 # and now start the master and rapi daemons
140 for daemon in 'ganeti-masterd', 'ganeti-rapi':
141 result = utils.RunCmd([daemon])
143 logging.error("Can't start daemon %s: %s", daemon, result.output)
148 def StopMaster(stop_daemons):
149 """Deactivate this node as master.
151 The function will always try to deactivate the IP address of the
152 master. Then, if the stop_daemons parameter is True, it will also
153 stop the master daemons (ganet-masterd and ganeti-rapi).
156 master_netdev, master_ip, _ = GetMasterInfo()
157 if not master_netdev:
160 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161 "dev", master_netdev])
163 logging.error("Can't remove the master IP, error: %s", result.output)
164 # but otherwise ignore the failure
167 # stop/kill the rapi and the master daemon
168 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
174 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175 """Joins this node to the cluster.
177 This does the following:
178 - updates the hostkeys of the machine (rsa and dsa)
179 - adds the ssh private key to the user
180 - adds the ssh public key to the users' authorized_keys file
183 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187 for name, content, mode in sshd_keys:
188 utils.WriteFile(name, data=content, mode=mode)
191 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193 except errors.OpExecError, err:
194 logging.exception("Error while processing user ssh files")
197 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198 utils.WriteFile(name, data=content, mode=0600)
200 utils.AddAuthorizedKey(auth_keys, sshpub)
202 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
208 """Cleans up the current node and prepares it to be removed from the cluster.
211 _CleanDirectory(constants.DATA_DIR)
215 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216 except errors.OpExecError:
217 logging.exception("Error while processing ssh files")
220 f = open(pub_key, 'r')
222 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
226 utils.RemoveFile(priv_key)
227 utils.RemoveFile(pub_key)
229 # Return a reassuring string to the caller, and quit
230 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
233 def GetNodeInfo(vgname, hypervisor_type):
234 """Gives back a hash with different informations about the node.
236 @type vgname: C{string}
237 @param vgname: the name of the volume group to ask for disk space information
238 @type hypervisor_type: C{str}
239 @param hypervisor_type: the name of the hypervisor to ask for
242 @return: dictionary with the following keys:
243 - vg_size is the size of the configured volume group in MiB
244 - vg_free is the free size of the volume group in MiB
245 - memory_dom0 is the memory allocated for domain0 in MiB
246 - memory_free is the currently available (free) ram in MiB
247 - memory_total is the total number of ram in MiB
251 vginfo = _GetVGInfo(vgname)
252 outputarray['vg_size'] = vginfo['vg_size']
253 outputarray['vg_free'] = vginfo['vg_free']
255 hyper = hypervisor.GetHypervisor(hypervisor_type)
256 hyp_info = hyper.GetNodeInfo()
257 if hyp_info is not None:
258 outputarray.update(hyp_info)
260 f = open("/proc/sys/kernel/random/boot_id", 'r')
262 outputarray["bootid"] = f.read(128).rstrip("\n")
269 def VerifyNode(what, cluster_name):
270 """Verify the status of the local node.
272 Based on the input L{what} parameter, various checks are done on the
275 If the I{filelist} key is present, this list of
276 files is checksummed and the file/checksum pairs are returned.
278 If the I{nodelist} key is present, we check that we have
279 connectivity via ssh with the target nodes (and check the hostname
282 If the I{node-net-test} key is present, we check that we have
283 connectivity to the given nodes via both primary IP and, if
284 applicable, secondary IPs.
287 @param what: a dictionary of things to check:
288 - filelist: list of files for which to compute checksums
289 - nodelist: list of nodes we should check ssh communication with
290 - node-net-test: list of nodes we should check node daemon port
292 - hypervisor: list with hypervisors to run the verify for
297 if 'hypervisor' in what:
298 result['hypervisor'] = my_dict = {}
299 for hv_name in what['hypervisor']:
300 my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
302 if 'filelist' in what:
303 result['filelist'] = utils.FingerprintFiles(what['filelist'])
305 if 'nodelist' in what:
306 result['nodelist'] = {}
307 random.shuffle(what['nodelist'])
308 for node in what['nodelist']:
309 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
311 result['nodelist'][node] = message
312 if 'node-net-test' in what:
313 result['node-net-test'] = {}
314 my_name = utils.HostInfo().name
315 my_pip = my_sip = None
316 for name, pip, sip in what['node-net-test']:
322 result['node-net-test'][my_name] = ("Can't find my own"
323 " primary/secondary IP"
326 port = utils.GetNodeDaemonPort()
327 for name, pip, sip in what['node-net-test']:
329 if not utils.TcpPing(pip, port, source=my_pip):
330 fail.append("primary")
332 if not utils.TcpPing(sip, port, source=my_sip):
333 fail.append("secondary")
335 result['node-net-test'][name] = ("failure using the %s"
342 def GetVolumeList(vg_name):
343 """Compute list of logical volumes and their size.
346 dictionary of all partions (key) with their size (in MiB), inactive
348 {'test1': ('20.06', True, True)}
353 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354 "--separator=%s" % sep,
355 "-olv_name,lv_size,lv_attr", vg_name])
357 logging.error("Failed to list logical volumes, lvs output: %s",
361 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362 for line in result.stdout.splitlines():
364 match = valid_line_re.match(line)
366 logging.error("Invalid line returned from lvs output: '%s'", line)
368 name, size, attr = match.groups()
369 inactive = attr[4] == '-'
370 online = attr[5] == 'o'
371 lvs[name] = (size, inactive, online)
376 def ListVolumeGroups():
377 """List the volume groups and their size.
380 Dictionary with keys volume name and values the size of the volume
383 return utils.ListVolumeGroups()
387 """List all volumes on this node.
390 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
392 "--options=lv_name,lv_size,devices,vg_name"])
394 logging.error("Failed to list logical volumes, lvs output: %s",
400 return dev.split('(')[0]
406 'name': line[0].strip(),
407 'size': line[1].strip(),
408 'dev': parse_dev(line[2].strip()),
409 'vg': line[3].strip(),
412 return [map_line(line.split('|')) for line in result.stdout.splitlines()
413 if line.count('|') >= 3]
416 def BridgesExist(bridges_list):
417 """Check if a list of bridges exist on the current node.
420 True if all of them exist, false otherwise
423 for bridge in bridges_list:
424 if not utils.BridgeExists(bridge):
430 def GetInstanceList(hypervisor_list):
431 """Provides a list of instances.
433 @type hypervisor_list: list
434 @param hypervisor_list: the list of hypervisors to query information
437 @return: a list of all running instances on the current node
438 - instance1.example.com
439 - instance2.example.com
443 for hname in hypervisor_list:
445 names = hypervisor.GetHypervisor(hname).ListInstances()
446 results.extend(names)
447 except errors.HypervisorError, err:
448 logging.exception("Error enumerating instances for hypevisor %s", hname)
449 # FIXME: should we somehow not propagate this to the master?
455 def GetInstanceInfo(instance, hname):
456 """Gives back the informations about an instance as a dictionary.
458 @type instance: string
459 @param instance: the instance name
461 @param hname: the hypervisor type of the instance
464 @return: dictionary with the following keys:
465 - memory: memory size of instance (int)
466 - state: xen state of instance (string)
467 - time: cpu time of instance (float)
472 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473 if iinfo is not None:
474 output['memory'] = iinfo[2]
475 output['state'] = iinfo[4]
476 output['time'] = iinfo[5]
481 def GetAllInstancesInfo(hypervisor_list):
482 """Gather data about all instances.
484 This is the equivalent of `GetInstanceInfo()`, except that it
485 computes data for all instances at once, thus being faster if one
486 needs data about more than one instance.
488 @type hypervisor_list: list
489 @param hypervisor_list: list of hypervisors to query for instance data
491 @rtype: dict of dicts
492 @return: dictionary of instance: data, with data having the following keys:
493 - memory: memory size of instance (int)
494 - state: xen state of instance (string)
495 - time: cpu time of instance (float)
496 - vcpuus: the number of vcpus
501 for hname in hypervisor_list:
502 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
504 for name, inst_id, memory, vcpus, state, times in iinfo:
511 if name in output and output[name] != value:
512 raise errors.HypervisorError("Instance %s running duplicate"
513 " with different parameters" % name)
519 def AddOSToInstance(instance):
520 """Add an OS to an instance.
522 @type instance: L{objects.Instance}
523 @param instance: Instance whose OS is to be installed
526 inst_os = OSFromDisk(instance.os)
528 create_script = inst_os.create_script
529 create_env = OSEnvironment(instance)
531 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
532 instance.name, int(time.time()))
533 if not os.path.exists(constants.LOG_OS_DIR):
534 os.mkdir(constants.LOG_OS_DIR, 0750)
536 command = utils.BuildShellCmd("cd %s && %s &>%s",
537 inst_os.path, create_script, logfile)
539 result = utils.RunCmd(command, env=create_env)
541 logging.error("os create command '%s' returned error: %s, logfile: %s,"
542 " output: %s", command, result.fail_reason, logfile,
549 def RunRenameInstance(instance, old_name):
550 """Run the OS rename script for an instance.
552 @type instance: objects.Instance
553 @param instance: Instance whose OS is to be installed
554 @type old_name: string
555 @param old_name: previous instance name
558 inst_os = OSFromDisk(instance.os)
560 script = inst_os.rename_script
561 rename_env = OSEnvironment(instance)
562 rename_env['OLD_INSTANCE_NAME'] = old_name
564 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
566 instance.name, int(time.time()))
567 if not os.path.exists(constants.LOG_OS_DIR):
568 os.mkdir(constants.LOG_OS_DIR, 0750)
570 command = utils.BuildShellCmd("cd %s && %s &>%s",
571 inst_os.path, script, logfile)
573 result = utils.RunCmd(command, env=rename_env)
576 logging.error("os create command '%s' returned error: %s output: %s",
577 command, result.fail_reason, result.output)
583 def _GetVGInfo(vg_name):
584 """Get informations about the volume group.
587 vg_name: the volume group
590 { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
592 vg_size is the total size of the volume group in MiB
593 vg_free is the free size of the volume group in MiB
594 pv_count are the number of physical disks in that vg
596 If an error occurs during gathering of data, we return the same dict
597 with keys all set to None.
600 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
602 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
603 "--nosuffix", "--units=m", "--separator=:", vg_name])
606 logging.error("volume group %s not present", vg_name)
608 valarr = retval.stdout.strip().rstrip(':').split(':')
612 "vg_size": int(round(float(valarr[0]), 0)),
613 "vg_free": int(round(float(valarr[1]), 0)),
614 "pv_count": int(valarr[2]),
616 except ValueError, err:
617 logging.exception("Fail to parse vgs output")
619 logging.error("vgs output has the wrong number of fields (expected"
620 " three): %s", str(valarr))
624 def _GatherBlockDevs(instance):
625 """Set up an instance's block device(s).
627 This is run on the primary node at instance startup. The block
628 devices must be already assembled.
632 for disk in instance.disks:
633 device = _RecursiveFindBD(disk)
635 raise errors.BlockDeviceError("Block device '%s' is not set up." %
638 block_devices.append((disk, device))
642 def StartInstance(instance, extra_args):
643 """Start an instance.
645 @type instance: instance object
646 @param instance: the instance object
648 @return: whether the startup was successful or not
651 running_instances = GetInstanceList([instance.hypervisor])
653 if instance.name in running_instances:
656 block_devices = _GatherBlockDevs(instance)
657 hyper = hypervisor.GetHypervisor(instance.hypervisor)
660 hyper.StartInstance(instance, block_devices, extra_args)
661 except errors.HypervisorError, err:
662 logging.exception("Failed to start instance")
668 def ShutdownInstance(instance):
669 """Shut an instance down.
671 @type instance: instance object
672 @param instance: the instance object
674 @return: whether the startup was successful or not
677 hv_name = instance.hypervisor
678 running_instances = GetInstanceList([hv_name])
680 if instance.name not in running_instances:
683 hyper = hypervisor.GetHypervisor(hv_name)
685 hyper.StopInstance(instance)
686 except errors.HypervisorError, err:
687 logging.error("Failed to stop instance")
690 # test every 10secs for 2min
694 for dummy in range(11):
695 if instance.name not in GetInstanceList([hv_name]):
699 # the shutdown did not succeed
700 logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
703 hyper.StopInstance(instance, force=True)
704 except errors.HypervisorError, err:
705 logging.exception("Failed to stop instance")
709 if instance.name in GetInstanceList([hv_name]):
710 logging.error("could not shutdown instance '%s' even by destroy",
717 def RebootInstance(instance, reboot_type, extra_args):
718 """Reboot an instance.
721 instance - name of instance to reboot
722 reboot_type - how to reboot [soft,hard,full]
725 running_instances = GetInstanceList([instance.hypervisor])
727 if instance.name not in running_instances:
728 logging.error("Cannot reboot instance that is not running")
731 hyper = hypervisor.GetHypervisor(instance.hypervisor)
732 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
734 hyper.RebootInstance(instance)
735 except errors.HypervisorError, err:
736 logging.exception("Failed to soft reboot instance")
738 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
740 ShutdownInstance(instance)
741 StartInstance(instance, extra_args)
742 except errors.HypervisorError, err:
743 logging.exception("Failed to hard reboot instance")
746 raise errors.ParameterError("reboot_type invalid")
751 def MigrateInstance(instance, target, live):
752 """Migrates an instance to another node.
754 @type instance: C{objects.Instance}
755 @param instance: the instance definition
757 @param target: the target node name
759 @param live: whether the migration should be done live or not (the
760 interpretation of this parameter is left to the hypervisor)
762 @return: a tuple of (success, msg) where:
763 - succes is a boolean denoting the success/failure of the operation
764 - msg is a string with details in case of failure
767 hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
770 hyper.MigrateInstance(instance.name, target, live)
771 except errors.HypervisorError, err:
772 msg = "Failed to migrate instance: %s" % str(err)
775 return (True, "Migration successfull")
778 def CreateBlockDevice(disk, size, owner, on_primary, info):
779 """Creates a block device for an instance.
782 disk: a ganeti.objects.Disk object
783 size: the size of the physical underlying device
784 owner: a string with the name of the instance
785 on_primary: a boolean indicating if it is the primary node or not
786 info: string that will be sent to the physical device creation
789 the new unique_id of the device (this can sometime be
790 computed only after creation), or None. On secondary nodes,
791 it's not required to return anything.
796 for child in disk.children:
797 crdev = _RecursiveAssembleBD(child, owner, on_primary)
798 if on_primary or disk.AssembleOnSecondary():
799 # we need the children open in case the device itself has to
804 device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
805 if device is not None:
806 logging.info("removing existing device %s", disk)
808 except errors.BlockDeviceError, err:
811 device = bdev.Create(disk.dev_type, disk.physical_id,
814 raise ValueError("Can't create child device for %s, %s" %
816 if on_primary or disk.AssembleOnSecondary():
817 if not device.Assemble():
818 errorstring = "Can't assemble device after creation"
819 logging.error(errorstring)
820 raise errors.BlockDeviceError("%s, very unusual event - check the node"
821 " daemon logs" % errorstring)
822 device.SetSyncSpeed(constants.SYNC_SPEED)
823 if on_primary or disk.OpenOnSecondary():
824 device.Open(force=True)
825 DevCacheManager.UpdateCache(device.dev_path, owner,
826 on_primary, disk.iv_name)
830 physical_id = device.unique_id
834 def RemoveBlockDevice(disk):
835 """Remove a block device.
837 This is intended to be called recursively.
841 # since we are removing the device, allow a partial match
842 # this allows removal of broken mirrors
843 rdev = _RecursiveFindBD(disk, allow_partial=True)
844 except errors.BlockDeviceError, err:
845 # probably can't attach
846 logging.info("Can't attach to device %s in remove", disk)
849 r_path = rdev.dev_path
850 result = rdev.Remove()
852 DevCacheManager.RemoveCache(r_path)
856 for child in disk.children:
857 result = result and RemoveBlockDevice(child)
861 def _RecursiveAssembleBD(disk, owner, as_primary):
862 """Activate a block device for an instance.
864 This is run on the primary and secondary nodes for an instance.
866 This function is called recursively.
869 disk: a objects.Disk object
870 as_primary: if we should make the block device read/write
873 the assembled device or None (in case no device was assembled)
875 If the assembly is not successful, an exception is raised.
880 mcn = disk.ChildrenNeeded()
882 mcn = 0 # max number of Nones allowed
884 mcn = len(disk.children) - mcn # max number of Nones
885 for chld_disk in disk.children:
887 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
888 except errors.BlockDeviceError, err:
889 if children.count(None) >= mcn:
892 logging.debug("Error in child activation: %s", str(err))
893 children.append(cdev)
895 if as_primary or disk.AssembleOnSecondary():
896 r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
897 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
899 if as_primary or disk.OpenOnSecondary():
901 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
902 as_primary, disk.iv_name)
909 def AssembleBlockDevice(disk, owner, as_primary):
910 """Activate a block device for an instance.
912 This is a wrapper over _RecursiveAssembleBD.
915 a /dev path for primary nodes
916 True for secondary nodes
919 result = _RecursiveAssembleBD(disk, owner, as_primary)
920 if isinstance(result, bdev.BlockDev):
921 result = result.dev_path
925 def ShutdownBlockDevice(disk):
926 """Shut down a block device.
928 First, if the device is assembled (can `Attach()`), then the device
929 is shutdown. Then the children of the device are shutdown.
931 This function is called recursively. Note that we don't cache the
932 children or such, as oppossed to assemble, shutdown of different
933 devices doesn't require that the upper device was active.
936 r_dev = _RecursiveFindBD(disk)
937 if r_dev is not None:
938 r_path = r_dev.dev_path
939 result = r_dev.Shutdown()
941 DevCacheManager.RemoveCache(r_path)
945 for child in disk.children:
946 result = result and ShutdownBlockDevice(child)
950 def MirrorAddChildren(parent_cdev, new_cdevs):
951 """Extend a mirrored block device.
954 parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
955 if parent_bdev is None:
956 logging.error("Can't find parent device")
958 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
959 if new_bdevs.count(None) > 0:
960 logging.error("Can't find new device(s) to add: %s:%s",
961 new_bdevs, new_cdevs)
963 parent_bdev.AddChildren(new_bdevs)
967 def MirrorRemoveChildren(parent_cdev, new_cdevs):
968 """Shrink a mirrored block device.
971 parent_bdev = _RecursiveFindBD(parent_cdev)
972 if parent_bdev is None:
973 logging.error("Can't find parent in remove children: %s", parent_cdev)
976 for disk in new_cdevs:
977 rpath = disk.StaticDevPath()
979 bd = _RecursiveFindBD(disk)
981 logging.error("Can't find dynamic device %s while removing children",
985 devs.append(bd.dev_path)
988 parent_bdev.RemoveChildren(devs)
992 def GetMirrorStatus(disks):
993 """Get the mirroring status of a list of devices.
996 disks: list of `objects.Disk`
999 list of (mirror_done, estimated_time) tuples, which
1000 are the result of bdev.BlockDevice.CombinedSyncStatus()
1005 rbd = _RecursiveFindBD(dsk)
1007 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1008 stats.append(rbd.CombinedSyncStatus())
1012 def _RecursiveFindBD(disk, allow_partial=False):
1013 """Check if a device is activated.
1015 If so, return informations about the real device.
1018 disk: the objects.Disk instance
1019 allow_partial: don't abort the find if a child of the
1020 device can't be found; this is intended to be
1021 used when repairing mirrors
1024 None if the device can't be found
1025 otherwise the device instance
1030 for chdisk in disk.children:
1031 children.append(_RecursiveFindBD(chdisk))
1033 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1036 def FindBlockDevice(disk):
1037 """Check if a device is activated.
1039 If so, return informations about the real device.
1042 disk: the objects.Disk instance
1044 None if the device can't be found
1045 (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1048 rbd = _RecursiveFindBD(disk)
1051 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1054 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1055 """Write a file to the filesystem.
1057 This allows the master to overwrite(!) a file. It will only perform
1058 the operation if the file belongs to a list of configuration files.
1061 if not os.path.isabs(file_name):
1062 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1067 constants.CLUSTER_CONF_FILE,
1068 constants.ETC_HOSTS,
1069 constants.SSH_KNOWN_HOSTS_FILE,
1070 constants.VNC_PASSWORD_FILE,
1073 if file_name not in allowed_files:
1074 logging.error("Filename passed to UploadFile not in allowed"
1075 " upload targets: '%s'", file_name)
1078 utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1079 atime=atime, mtime=mtime)
1083 def _ErrnoOrStr(err):
1084 """Format an EnvironmentError exception.
1086 If the `err` argument has an errno attribute, it will be looked up
1087 and converted into a textual EXXXX description. Otherwise the string
1088 representation of the error will be returned.
1091 if hasattr(err, 'errno'):
1092 detail = errno.errorcode[err.errno]
1098 def _OSOndiskVersion(name, os_dir):
1099 """Compute and return the API version of a given OS.
1101 This function will try to read the API version of the os given by
1102 the 'name' parameter and residing in the 'os_dir' directory.
1104 Return value will be either an integer denoting the version or None in the
1105 case when this is not a valid OS name.
1108 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1111 st = os.stat(api_file)
1112 except EnvironmentError, err:
1113 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1114 " found (%s)" % _ErrnoOrStr(err))
1116 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1117 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1123 api_versions = f.readlines()
1126 except EnvironmentError, err:
1127 raise errors.InvalidOS(name, os_dir, "error while reading the"
1128 " API version (%s)" % _ErrnoOrStr(err))
1130 api_versions = [version.strip() for version in api_versions]
1132 api_versions = [int(version) for version in api_versions]
1133 except (TypeError, ValueError), err:
1134 raise errors.InvalidOS(name, os_dir,
1135 "API version is not integer (%s)" % str(err))
1140 def DiagnoseOS(top_dirs=None):
1141 """Compute the validity for all OSes.
1143 Returns an OS object for each name in all the given top directories
1144 (if not given defaults to constants.OS_SEARCH_PATH)
1150 if top_dirs is None:
1151 top_dirs = constants.OS_SEARCH_PATH
1154 for dir_name in top_dirs:
1155 if os.path.isdir(dir_name):
1157 f_names = utils.ListVisibleFiles(dir_name)
1158 except EnvironmentError, err:
1159 logging.exception("Can't list the OS directory %s", dir_name)
1161 for name in f_names:
1163 os_inst = OSFromDisk(name, base_dir=dir_name)
1164 result.append(os_inst)
1165 except errors.InvalidOS, err:
1166 result.append(objects.OS.FromInvalidOS(err))
1171 def OSFromDisk(name, base_dir=None):
1172 """Create an OS instance from disk.
1174 This function will return an OS instance if the given name is a
1175 valid OS name. Otherwise, it will raise an appropriate
1176 `errors.InvalidOS` exception, detailing why this is not a valid
1179 @type base_dir: string
1180 @keyword base_dir: Base directory containing OS installations.
1181 Defaults to a search in all the OS_SEARCH_PATH dirs.
1185 if base_dir is None:
1186 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1188 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1190 os_dir = os.path.sep.join([base_dir, name])
1192 api_versions = _OSOndiskVersion(name, os_dir)
1194 if constants.OS_API_VERSION not in api_versions:
1195 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1196 " (found %s want %s)"
1197 % (api_versions, constants.OS_API_VERSION))
1199 # OS Scripts dictionary, we will populate it with the actual script names
1200 os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1202 for script in os_scripts:
1203 os_scripts[script] = os.path.sep.join([os_dir, script])
1206 st = os.stat(os_scripts[script])
1207 except EnvironmentError, err:
1208 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1209 (script, _ErrnoOrStr(err)))
1211 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1212 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1215 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1216 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1220 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1221 create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1222 export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1223 import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1224 rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1225 api_versions=api_versions)
1227 def OSEnvironment(instance, debug=0):
1228 """Calculate the environment for an os script.
1230 @type instance: instance object
1231 @param instance: target instance for the os script run
1232 @type debug: integer
1233 @param debug: debug level (0 or 1, for os api 10)
1235 @return: dict of environment variables
1239 result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1240 result['INSTANCE_NAME'] = instance.name
1241 result['HYPERVISOR'] = instance.hypervisor
1242 result['DISK_COUNT'] = '%d' % len(instance.disks)
1243 result['NIC_COUNT'] = '%d' % len(instance.nics)
1244 result['DEBUG_LEVEL'] = '%d' % debug
1245 for idx, disk in enumerate(instance.disks):
1246 real_disk = _RecursiveFindBD(disk)
1247 if real_disk is None:
1248 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1251 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1252 # FIXME: When disks will have read-only mode, populate this
1253 result['DISK_%d_ACCESS' % idx] = 'W'
1254 if constants.HV_DISK_TYPE in instance.hvparams:
1255 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1256 instance.hvparams[constants.HV_DISK_TYPE]
1257 if disk.dev_type in constants.LDS_BLOCK:
1258 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1259 elif disk.dev_type == constants.LD_FILE:
1260 result['DISK_%d_BACKEND_TYPE' % idx] = \
1261 'file:%s' % disk.physical_id[0]
1262 for idx, nic in enumerate(instance.nics):
1263 result['NIC_%d_MAC' % idx] = nic.mac
1265 result['NIC_%d_IP' % idx] = nic.ip
1266 result['NIC_%d_BRIDGE' % idx] = nic.bridge
1267 if constants.HV_NIC_TYPE in instance.hvparams:
1268 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1269 instance.hvparams[constants.HV_NIC_TYPE]
1273 def GrowBlockDevice(disk, amount):
1274 """Grow a stack of block devices.
1276 This function is called recursively, with the childrens being the
1280 disk: the disk to be grown
1282 Returns: a tuple of (status, result), with:
1283 status: the result (true/false) of the operation
1284 result: the error message if the operation failed, otherwise not used
1287 r_dev = _RecursiveFindBD(disk)
1289 return False, "Cannot find block device %s" % (disk,)
1293 except errors.BlockDeviceError, err:
1294 return False, str(err)
1299 def SnapshotBlockDevice(disk):
1300 """Create a snapshot copy of a block device.
1302 This function is called recursively, and the snapshot is actually created
1303 just for the leaf lvm backend device.
1305 @type disk: L{objects.Disk}
1306 @param disk: the disk to be snapshotted
1308 @return: snapshot disk path
1312 if len(disk.children) == 1:
1313 # only one child, let's recurse on it
1314 return SnapshotBlockDevice(disk.children[0])
1316 # more than one child, choose one that matches
1317 for child in disk.children:
1318 if child.size == disk.size:
1319 # return implies breaking the loop
1320 return SnapshotBlockDevice(child)
1321 elif disk.dev_type == constants.LD_LV:
1322 r_dev = _RecursiveFindBD(disk)
1323 if r_dev is not None:
1324 # let's stay on the safe side and ask for the full size, for now
1325 return r_dev.Snapshot(disk.size)
1329 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1330 " '%s' of type '%s'" %
1331 (disk.unique_id, disk.dev_type))
1334 def ExportSnapshot(disk, dest_node, instance, cluster_name):
1335 """Export a block device snapshot to a remote node.
1338 disk: the snapshot block device
1339 dest_node: the node to send the image to
1340 instance: instance being exported
1343 True if successful, False otherwise.
1346 # TODO(ultrotter): Import/Export still to be converted to OS API 10
1347 logging.error("Import/Export still to be converted to OS API 10")
1350 inst_os = OSFromDisk(instance.os)
1351 export_script = inst_os.export_script
1353 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1354 instance.name, int(time.time()))
1355 if not os.path.exists(constants.LOG_OS_DIR):
1356 os.mkdir(constants.LOG_OS_DIR, 0750)
1358 real_os_dev = _RecursiveFindBD(disk)
1359 if real_os_dev is None:
1360 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1364 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1365 destfile = disk.physical_id[1]
1367 # the target command is built out of three individual commands,
1368 # which are joined by pipes; we check each individual command for
1371 expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1372 export_script, instance.name,
1373 real_os_dev.dev_path, logfile)
1377 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1378 destdir, destdir, destfile)
1379 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1380 constants.GANETI_RUNAS,
1383 # all commands have been checked, so we're safe to combine them
1384 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1386 result = utils.RunCmd(command)
1389 logging.error("os snapshot export command '%s' returned error: %s"
1390 " output: %s", command, result.fail_reason, result.output)
1396 def FinalizeExport(instance, snap_disks):
1397 """Write out the export configuration information.
1400 instance: instance configuration
1401 snap_disks: snapshot block devices
1404 False in case of error, True otherwise.
1407 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1408 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1410 config = objects.SerializableConfigParser()
1412 config.add_section(constants.INISECT_EXP)
1413 config.set(constants.INISECT_EXP, 'version', '0')
1414 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1415 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1416 config.set(constants.INISECT_EXP, 'os', instance.os)
1417 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1419 config.add_section(constants.INISECT_INS)
1420 config.set(constants.INISECT_INS, 'name', instance.name)
1421 config.set(constants.INISECT_INS, 'memory', '%d' %
1422 instance.beparams[constants.BE_MEMORY])
1423 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1424 instance.beparams[constants.BE_VCPUS])
1425 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1428 for nic_count, nic in enumerate(instance.nics):
1429 config.set(constants.INISECT_INS, 'nic%d_mac' %
1430 nic_count, '%s' % nic.mac)
1431 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1432 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1434 # TODO: redundant: on load can read nics until it doesn't exist
1435 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1438 for disk_count, disk in enumerate(snap_disks):
1439 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1440 ('%s' % disk.iv_name))
1441 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1442 ('%s' % disk.physical_id[1]))
1443 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1445 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1447 cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1448 cfo = open(cff, 'w')
1454 shutil.rmtree(finaldestdir, True)
1455 shutil.move(destdir, finaldestdir)
1460 def ExportInfo(dest):
1461 """Get export configuration information.
1464 dest: directory containing the export
1467 A serializable config file containing the export info.
1470 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1472 config = objects.SerializableConfigParser()
1475 if (not config.has_section(constants.INISECT_EXP) or
1476 not config.has_section(constants.INISECT_INS)):
1482 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1484 """Import an os image into an instance.
1487 instance: the instance object
1488 os_disk: the instance-visible name of the os device
1489 swap_disk: the instance-visible name of the swap device
1490 src_node: node holding the source image
1491 src_image: path to the source image on src_node
1494 False in case of error, True otherwise.
1497 # TODO(ultrotter): Import/Export still to be converted to OS API 10
1498 logging.error("Import/Export still to be converted to OS API 10")
1501 inst_os = OSFromDisk(instance.os)
1502 import_script = inst_os.import_script
1504 os_device = instance.FindDisk(os_disk)
1505 if os_device is None:
1506 logging.error("Can't find this device-visible name '%s'", os_disk)
1509 swap_device = instance.FindDisk(swap_disk)
1510 if swap_device is None:
1511 logging.error("Can't find this device-visible name '%s'", swap_disk)
1514 real_os_dev = _RecursiveFindBD(os_device)
1515 if real_os_dev is None:
1516 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1520 real_swap_dev = _RecursiveFindBD(swap_device)
1521 if real_swap_dev is None:
1522 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1524 real_swap_dev.Open()
1526 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1527 instance.name, int(time.time()))
1528 if not os.path.exists(constants.LOG_OS_DIR):
1529 os.mkdir(constants.LOG_OS_DIR, 0750)
1531 destcmd = utils.BuildShellCmd('cat %s', src_image)
1532 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1533 constants.GANETI_RUNAS,
1537 impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1538 inst_os.path, import_script, instance.name,
1539 real_os_dev.dev_path, real_swap_dev.dev_path,
1542 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1543 env = {'HYPERVISOR': instance.hypervisor}
1545 result = utils.RunCmd(command, env=env)
1548 logging.error("os import command '%s' returned error: %s"
1549 " output: %s", command, result.fail_reason, result.output)
1556 """Return a list of exports currently available on this machine.
1559 if os.path.isdir(constants.EXPORT_DIR):
1560 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1565 def RemoveExport(export):
1566 """Remove an existing export from the node.
1569 export: the name of the export to remove
1572 False in case of error, True otherwise.
1575 target = os.path.join(constants.EXPORT_DIR, export)
1577 shutil.rmtree(target)
1578 # TODO: catch some of the relevant exceptions and provide a pretty
1579 # error message if rmtree fails.
1584 def RenameBlockDevices(devlist):
1585 """Rename a list of block devices.
1587 The devlist argument is a list of tuples (disk, new_logical,
1588 new_physical). The return value will be a combined boolean result
1589 (True only if all renames succeeded).
1593 for disk, unique_id in devlist:
1594 dev = _RecursiveFindBD(disk)
1599 old_rpath = dev.dev_path
1600 dev.Rename(unique_id)
1601 new_rpath = dev.dev_path
1602 if old_rpath != new_rpath:
1603 DevCacheManager.RemoveCache(old_rpath)
1604 # FIXME: we should add the new cache information here, like:
1605 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1606 # but we don't have the owner here - maybe parse from existing
1607 # cache? for now, we only lose lvm data when we rename, which
1608 # is less critical than DRBD or MD
1609 except errors.BlockDeviceError, err:
1610 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1615 def _TransformFileStorageDir(file_storage_dir):
1616 """Checks whether given file_storage_dir is valid.
1618 Checks wheter the given file_storage_dir is within the cluster-wide
1619 default file_storage_dir stored in SimpleStore. Only paths under that
1620 directory are allowed.
1623 file_storage_dir: string with path
1626 normalized file_storage_dir (string) if valid, None otherwise
1630 file_storage_dir = os.path.normpath(file_storage_dir)
1631 base_file_storage_dir = cfg.GetFileStorageDir()
1632 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1633 base_file_storage_dir):
1634 logging.error("file storage directory '%s' is not under base file"
1635 " storage directory '%s'",
1636 file_storage_dir, base_file_storage_dir)
1638 return file_storage_dir
1641 def CreateFileStorageDir(file_storage_dir):
1642 """Create file storage directory.
1645 file_storage_dir: string containing the path
1648 tuple with first element a boolean indicating wheter dir
1649 creation was successful or not
1652 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1654 if not file_storage_dir:
1657 if os.path.exists(file_storage_dir):
1658 if not os.path.isdir(file_storage_dir):
1659 logging.error("'%s' is not a directory", file_storage_dir)
1663 os.makedirs(file_storage_dir, 0750)
1664 except OSError, err:
1665 logging.error("Cannot create file storage directory '%s': %s",
1666 file_storage_dir, err)
1671 def RemoveFileStorageDir(file_storage_dir):
1672 """Remove file storage directory.
1674 Remove it only if it's empty. If not log an error and return.
1677 file_storage_dir: string containing the path
1680 tuple with first element a boolean indicating wheter dir
1681 removal was successful or not
1684 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1686 if not file_storage_dir:
1689 if os.path.exists(file_storage_dir):
1690 if not os.path.isdir(file_storage_dir):
1691 logging.error("'%s' is not a directory", file_storage_dir)
1693 # deletes dir only if empty, otherwise we want to return False
1695 os.rmdir(file_storage_dir)
1696 except OSError, err:
1697 logging.exception("Cannot remove file storage directory '%s'",
1703 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1704 """Rename the file storage directory.
1707 old_file_storage_dir: string containing the old path
1708 new_file_storage_dir: string containing the new path
1711 tuple with first element a boolean indicating wheter dir
1712 rename was successful or not
1715 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1716 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1718 if not old_file_storage_dir or not new_file_storage_dir:
1721 if not os.path.exists(new_file_storage_dir):
1722 if os.path.isdir(old_file_storage_dir):
1724 os.rename(old_file_storage_dir, new_file_storage_dir)
1725 except OSError, err:
1726 logging.exception("Cannot rename '%s' to '%s'",
1727 old_file_storage_dir, new_file_storage_dir)
1730 logging.error("'%s' is not a directory", old_file_storage_dir)
1733 if os.path.exists(old_file_storage_dir):
1734 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1735 old_file_storage_dir, new_file_storage_dir)
1740 def _IsJobQueueFile(file_name):
1741 """Checks whether the given filename is in the queue directory.
1744 queue_dir = os.path.normpath(constants.QUEUE_DIR)
1745 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1748 logging.error("'%s' is not a file in the queue directory",
1754 def JobQueueUpdate(file_name, content):
1755 """Updates a file in the queue directory.
1758 if not _IsJobQueueFile(file_name):
1761 # Write and replace the file atomically
1762 utils.WriteFile(file_name, data=content)
1767 def JobQueueRename(old, new):
1768 """Renames a job queue file.
1771 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1779 def JobQueueSetDrainFlag(drain_flag):
1780 """Set the drain flag for the queue.
1782 This will set or unset the queue drain flag.
1784 @type drain_flag: bool
1785 @param drain_flag: if True, will set the drain flag, otherwise reset it.
1789 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1791 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1796 def CloseBlockDevices(disks):
1797 """Closes the given block devices.
1799 This means they will be switched to secondary mode (in case of DRBD).
1804 rd = _RecursiveFindBD(cf)
1806 return (False, "Can't find device %s" % cf)
1813 except errors.BlockDeviceError, err:
1814 msg.append(str(err))
1816 return (False, "Can't make devices secondary: %s" % ",".join(msg))
1818 return (True, "All devices secondary")
1821 def ValidateHVParams(hvname, hvparams):
1822 """Validates the given hypervisor parameters.
1824 @type hvname: string
1825 @param hvname: the hypervisor name
1826 @type hvparams: dict
1827 @param hvparams: the hypervisor parameters to be validated
1828 @rtype: tuple (bool, str)
1829 @return: tuple of (success, message)
1833 hv_type = hypervisor.GetHypervisor(hvname)
1834 hv_type.ValidateParameters(hvparams)
1835 return (True, "Validation passed")
1836 except errors.HypervisorError, err:
1837 return (False, str(err))
1840 class HooksRunner(object):
1843 This class is instantiated on the node side (ganeti-noded) and not on
1847 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1849 def __init__(self, hooks_base_dir=None):
1850 """Constructor for hooks runner.
1853 - hooks_base_dir: if not None, this overrides the
1854 constants.HOOKS_BASE_DIR (useful for unittests)
1857 if hooks_base_dir is None:
1858 hooks_base_dir = constants.HOOKS_BASE_DIR
1859 self._BASE_DIR = hooks_base_dir
1862 def ExecHook(script, env):
1863 """Exec one hook script.
1866 - script: the full path to the script
1867 - env: the environment with which to exec the script
1870 # exec the process using subprocess and log the output
1873 fdstdin = open("/dev/null", "r")
1874 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1875 stderr=subprocess.STDOUT, close_fds=True,
1876 shell=False, cwd="/", env=env)
1879 output = child.stdout.read(4096)
1880 child.stdout.close()
1881 except EnvironmentError, err:
1882 output += "Hook script error: %s" % str(err)
1886 result = child.wait()
1888 except EnvironmentError, err:
1889 if err.errno == errno.EINTR:
1893 # try not to leak fds
1894 for fd in (fdstdin, ):
1898 except EnvironmentError, err:
1899 # just log the error
1900 #logging.exception("Error while closing fd %s", fd)
1903 return result == 0, output
1905 def RunHooks(self, hpath, phase, env):
1906 """Run the scripts in the hooks directory.
1908 This method will not be usually overriden by child opcodes.
1911 if phase == constants.HOOKS_PHASE_PRE:
1913 elif phase == constants.HOOKS_PHASE_POST:
1916 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1919 subdir = "%s-%s.d" % (hpath, suffix)
1920 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1922 dir_contents = utils.ListVisibleFiles(dir_name)
1923 except OSError, err:
1927 # we use the standard python sort order,
1928 # so 00name is the recommended naming scheme
1930 for relname in dir_contents:
1931 fname = os.path.join(dir_name, relname)
1932 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1933 self.RE_MASK.match(relname) is not None):
1934 rrval = constants.HKR_SKIP
1937 result, output = self.ExecHook(fname, env)
1939 rrval = constants.HKR_FAIL
1941 rrval = constants.HKR_SUCCESS
1942 rr.append(("%s/%s" % (subdir, relname), rrval, output))
1947 class IAllocatorRunner(object):
1948 """IAllocator runner.
1950 This class is instantiated on the node side (ganeti-noded) and not on
1954 def Run(self, name, idata):
1955 """Run an iallocator script.
1957 Return value: tuple of:
1958 - run status (one of the IARUN_ constants)
1961 - fail reason (as from utils.RunResult)
1964 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1966 if alloc_script is None:
1967 return (constants.IARUN_NOTFOUND, None, None, None)
1969 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1973 result = utils.RunCmd([alloc_script, fin_name])
1975 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1980 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1983 class DevCacheManager(object):
1984 """Simple class for managing a cache of block device information.
1987 _DEV_PREFIX = "/dev/"
1988 _ROOT_DIR = constants.BDEV_CACHE_DIR
1991 def _ConvertPath(cls, dev_path):
1992 """Converts a /dev/name path to the cache file name.
1994 This replaces slashes with underscores and strips the /dev
1995 prefix. It then returns the full path to the cache file
1998 if dev_path.startswith(cls._DEV_PREFIX):
1999 dev_path = dev_path[len(cls._DEV_PREFIX):]
2000 dev_path = dev_path.replace("/", "_")
2001 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2005 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2006 """Updates the cache information for a given device.
2009 if dev_path is None:
2010 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2012 fpath = cls._ConvertPath(dev_path)
2018 iv_name = "not_visible"
2019 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2021 utils.WriteFile(fpath, data=fdata)
2022 except EnvironmentError, err:
2023 logging.exception("Can't update bdev cache for %s", dev_path)
2026 def RemoveCache(cls, dev_path):
2027 """Remove data for a dev_path.
2030 if dev_path is None:
2031 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2033 fpath = cls._ConvertPath(dev_path)
2035 utils.RemoveFile(fpath)
2036 except EnvironmentError, err:
2037 logging.exception("Can't update bdev cache for %s", dev_path)