4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
48 return ssh.SshRunner()
52 """Return the master ip and netdev.
56 ss = ssconf.SimpleStore()
57 master_netdev = ss.GetMasterNetdev()
58 master_ip = ss.GetMasterIP()
59 except errors.ConfigurationError, err:
60 logging.exception("Cluster configuration incomplete")
62 return (master_netdev, master_ip)
65 def StartMaster(start_daemons):
66 """Activate local node as master node.
68 The function will always try activate the IP address of the master
69 (if someone else has it, then it won't). Then, if the start_daemons
70 parameter is True, it will also start the master daemons
71 (ganet-masterd and ganeti-rapi).
75 master_netdev, master_ip = _GetMasterInfo()
79 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
80 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
81 source=constants.LOCALHOST_IP_ADDRESS):
82 # we already have the ip:
83 logging.debug("Already started")
85 logging.error("Someone else has the master ip, not activating")
88 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
89 "dev", master_netdev, "label",
90 "%s:0" % master_netdev])
92 logging.error("Can't activate master IP: %s", result.output)
95 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
96 "-s", master_ip, master_ip])
97 # we'll ignore the exit code of arping
99 # and now start the master and rapi daemons
101 for daemon in 'ganeti-masterd', 'ganeti-rapi':
102 result = utils.RunCmd([daemon])
104 logging.error("Can't start daemon %s: %s", daemon, result.output)
109 def StopMaster(stop_daemons):
110 """Deactivate this node as master.
112 The function will always try to deactivate the IP address of the
113 master. Then, if the stop_daemons parameter is True, it will also
114 stop the master daemons (ganet-masterd and ganeti-rapi).
117 master_netdev, master_ip = _GetMasterInfo()
118 if not master_netdev:
121 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
122 "dev", master_netdev])
124 logging.error("Can't remove the master IP, error: %s", result.output)
125 # but otherwise ignore the failure
128 # stop/kill the rapi and the master daemon
129 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
130 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
135 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
136 """Joins this node to the cluster.
138 This does the following:
139 - updates the hostkeys of the machine (rsa and dsa)
140 - adds the ssh private key to the user
141 - adds the ssh public key to the users' authorized_keys file
144 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
145 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
146 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
147 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
148 for name, content, mode in sshd_keys:
149 utils.WriteFile(name, data=content, mode=mode)
152 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
154 except errors.OpExecError, err:
155 logging.exception("Error while processing user ssh files")
158 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
159 utils.WriteFile(name, data=content, mode=0600)
161 utils.AddAuthorizedKey(auth_keys, sshpub)
163 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
169 """Cleans up the current node and prepares it to be removed from the cluster.
172 if os.path.isdir(constants.DATA_DIR):
173 for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
174 full_name = os.path.join(constants.DATA_DIR, rel_name)
175 if os.path.isfile(full_name) and not os.path.islink(full_name):
176 utils.RemoveFile(full_name)
179 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
180 except errors.OpExecError:
181 logging.exception("Error while processing ssh files")
184 f = open(pub_key, 'r')
186 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
190 utils.RemoveFile(priv_key)
191 utils.RemoveFile(pub_key)
193 # Return a reassuring string to the caller, and quit
194 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
197 def GetNodeInfo(vgname):
198 """Gives back a hash with different informations about the node.
201 { 'vg_size' : xxx, 'vg_free' : xxx, 'memory_domain0': xxx,
202 'memory_free' : xxx, 'memory_total' : xxx }
204 vg_size is the size of the configured volume group in MiB
205 vg_free is the free size of the volume group in MiB
206 memory_dom0 is the memory allocated for domain0 in MiB
207 memory_free is the currently available (free) ram in MiB
208 memory_total is the total number of ram in MiB
212 vginfo = _GetVGInfo(vgname)
213 outputarray['vg_size'] = vginfo['vg_size']
214 outputarray['vg_free'] = vginfo['vg_free']
216 hyper = hypervisor.GetHypervisor()
217 hyp_info = hyper.GetNodeInfo()
218 if hyp_info is not None:
219 outputarray.update(hyp_info)
221 f = open("/proc/sys/kernel/random/boot_id", 'r')
223 outputarray["bootid"] = f.read(128).rstrip("\n")
230 def VerifyNode(what):
231 """Verify the status of the local node.
234 what - a dictionary of things to check:
235 'filelist' : list of files for which to compute checksums
236 'nodelist' : list of nodes we should check communication with
237 'hypervisor': run the hypervisor-specific verify
239 Requested files on local node are checksummed and the result returned.
241 The nodelist is traversed, with the following checks being made
243 - known_hosts key correct
244 - correct resolving of node name (target node returns its own hostname
245 by ssh-execution of 'hostname', result compared against name in list.
250 if 'hypervisor' in what:
251 result['hypervisor'] = hypervisor.GetHypervisor().Verify()
253 if 'filelist' in what:
254 result['filelist'] = utils.FingerprintFiles(what['filelist'])
256 if 'nodelist' in what:
257 result['nodelist'] = {}
258 random.shuffle(what['nodelist'])
259 for node in what['nodelist']:
260 success, message = _GetSshRunner().VerifyNodeHostname(node)
262 result['nodelist'][node] = message
263 if 'node-net-test' in what:
264 result['node-net-test'] = {}
265 my_name = utils.HostInfo().name
266 my_pip = my_sip = None
267 for name, pip, sip in what['node-net-test']:
273 result['node-net-test'][my_name] = ("Can't find my own"
274 " primary/secondary IP"
277 port = ssconf.SimpleStore().GetNodeDaemonPort()
278 for name, pip, sip in what['node-net-test']:
280 if not utils.TcpPing(pip, port, source=my_pip):
281 fail.append("primary")
283 if not utils.TcpPing(sip, port, source=my_sip):
284 fail.append("secondary")
286 result['node-net-test'][name] = ("failure using the %s"
293 def GetVolumeList(vg_name):
294 """Compute list of logical volumes and their size.
297 dictionary of all partions (key) with their size (in MiB), inactive
299 {'test1': ('20.06', True, True)}
304 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
305 "--separator=%s" % sep,
306 "-olv_name,lv_size,lv_attr", vg_name])
308 logging.error("Failed to list logical volumes, lvs output: %s",
312 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
313 for line in result.stdout.splitlines():
315 match = valid_line_re.match(line)
317 logging.error("Invalid line returned from lvs output: '%s'", line)
319 name, size, attr = match.groups()
320 inactive = attr[4] == '-'
321 online = attr[5] == 'o'
322 lvs[name] = (size, inactive, online)
327 def ListVolumeGroups():
328 """List the volume groups and their size.
331 Dictionary with keys volume name and values the size of the volume
334 return utils.ListVolumeGroups()
338 """List all volumes on this node.
341 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
343 "--options=lv_name,lv_size,devices,vg_name"])
345 logging.error("Failed to list logical volumes, lvs output: %s",
351 return dev.split('(')[0]
357 'name': line[0].strip(),
358 'size': line[1].strip(),
359 'dev': parse_dev(line[2].strip()),
360 'vg': line[3].strip(),
363 return [map_line(line.split('|')) for line in result.stdout.splitlines()
364 if line.count('|') >= 3]
367 def BridgesExist(bridges_list):
368 """Check if a list of bridges exist on the current node.
371 True if all of them exist, false otherwise
374 for bridge in bridges_list:
375 if not utils.BridgeExists(bridge):
381 def GetInstanceList():
382 """Provides a list of instances.
385 A list of all running instances on the current node
386 - instance1.example.com
387 - instance2.example.com
391 names = hypervisor.GetHypervisor().ListInstances()
392 except errors.HypervisorError, err:
393 logging.exception("Error enumerating instances")
399 def GetInstanceInfo(instance):
400 """Gives back the informations about an instance as a dictionary.
403 instance: name of the instance (ex. instance1.example.com)
406 { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
408 memory: memory size of instance (int)
409 state: xen state of instance (string)
410 time: cpu time of instance (float)
415 iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
416 if iinfo is not None:
417 output['memory'] = iinfo[2]
418 output['state'] = iinfo[4]
419 output['time'] = iinfo[5]
424 def GetAllInstancesInfo():
425 """Gather data about all instances.
427 This is the equivalent of `GetInstanceInfo()`, except that it
428 computes data for all instances at once, thus being faster if one
429 needs data about more than one instance.
431 Returns: a dictionary of dictionaries, keys being the instance name,
433 { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
435 memory: memory size of instance (int)
436 state: xen state of instance (string)
437 time: cpu time of instance (float)
438 vcpus: the number of cpus
443 iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
445 for name, inst_id, memory, vcpus, state, times in iinfo:
456 def AddOSToInstance(instance, os_disk, swap_disk):
457 """Add an OS to an instance.
460 instance: the instance object
461 os_disk: the instance-visible name of the os device
462 swap_disk: the instance-visible name of the swap device
465 inst_os = OSFromDisk(instance.os)
467 create_script = inst_os.create_script
469 os_device = instance.FindDisk(os_disk)
470 if os_device is None:
471 logging.error("Can't find this device-visible name '%s'", os_disk)
474 swap_device = instance.FindDisk(swap_disk)
475 if swap_device is None:
476 logging.error("Can't find this device-visible name '%s'", swap_disk)
479 real_os_dev = _RecursiveFindBD(os_device)
480 if real_os_dev is None:
481 raise errors.BlockDeviceError("Block device '%s' is not set up" %
485 real_swap_dev = _RecursiveFindBD(swap_device)
486 if real_swap_dev is None:
487 raise errors.BlockDeviceError("Block device '%s' is not set up" %
491 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
492 instance.name, int(time.time()))
493 if not os.path.exists(constants.LOG_OS_DIR):
494 os.mkdir(constants.LOG_OS_DIR, 0750)
496 command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
497 inst_os.path, create_script, instance.name,
498 real_os_dev.dev_path, real_swap_dev.dev_path,
501 result = utils.RunCmd(command)
503 logging.error("os create command '%s' returned error: %s, logfile: %s,"
504 " output: %s", command, result.fail_reason, logfile,
511 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
512 """Run the OS rename script for an instance.
515 instance: the instance object
516 old_name: the old name of the instance
517 os_disk: the instance-visible name of the os device
518 swap_disk: the instance-visible name of the swap device
521 inst_os = OSFromDisk(instance.os)
523 script = inst_os.rename_script
525 os_device = instance.FindDisk(os_disk)
526 if os_device is None:
527 logging.error("Can't find this device-visible name '%s'", os_disk)
530 swap_device = instance.FindDisk(swap_disk)
531 if swap_device is None:
532 logging.error("Can't find this device-visible name '%s'", swap_disk)
535 real_os_dev = _RecursiveFindBD(os_device)
536 if real_os_dev is None:
537 raise errors.BlockDeviceError("Block device '%s' is not set up" %
541 real_swap_dev = _RecursiveFindBD(swap_device)
542 if real_swap_dev is None:
543 raise errors.BlockDeviceError("Block device '%s' is not set up" %
547 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
549 instance.name, int(time.time()))
550 if not os.path.exists(constants.LOG_OS_DIR):
551 os.mkdir(constants.LOG_OS_DIR, 0750)
553 command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
554 inst_os.path, script, old_name, instance.name,
555 real_os_dev.dev_path, real_swap_dev.dev_path,
558 result = utils.RunCmd(command)
561 logging.error("os create command '%s' returned error: %s output: %s",
562 command, result.fail_reason, result.output)
568 def _GetVGInfo(vg_name):
569 """Get informations about the volume group.
572 vg_name: the volume group
575 { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
577 vg_size is the total size of the volume group in MiB
578 vg_free is the free size of the volume group in MiB
579 pv_count are the number of physical disks in that vg
581 If an error occurs during gathering of data, we return the same dict
582 with keys all set to None.
585 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
587 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
588 "--nosuffix", "--units=m", "--separator=:", vg_name])
591 logging.error("volume group %s not present", vg_name)
593 valarr = retval.stdout.strip().rstrip(':').split(':')
597 "vg_size": int(round(float(valarr[0]), 0)),
598 "vg_free": int(round(float(valarr[1]), 0)),
599 "pv_count": int(valarr[2]),
601 except ValueError, err:
602 logging.exception("Fail to parse vgs output")
604 logging.error("vgs output has the wrong number of fields (expected"
605 " three): %s", str(valarr))
609 def _GatherBlockDevs(instance):
610 """Set up an instance's block device(s).
612 This is run on the primary node at instance startup. The block
613 devices must be already assembled.
617 for disk in instance.disks:
618 device = _RecursiveFindBD(disk)
620 raise errors.BlockDeviceError("Block device '%s' is not set up." %
623 block_devices.append((disk, device))
627 def StartInstance(instance, extra_args):
628 """Start an instance.
631 instance - name of instance to start.
634 running_instances = GetInstanceList()
636 if instance.name in running_instances:
639 block_devices = _GatherBlockDevs(instance)
640 hyper = hypervisor.GetHypervisor()
643 hyper.StartInstance(instance, block_devices, extra_args)
644 except errors.HypervisorError, err:
645 logging.exception("Failed to start instance")
651 def ShutdownInstance(instance):
652 """Shut an instance down.
655 instance - name of instance to shutdown.
658 running_instances = GetInstanceList()
660 if instance.name not in running_instances:
663 hyper = hypervisor.GetHypervisor()
665 hyper.StopInstance(instance)
666 except errors.HypervisorError, err:
667 logging.error("Failed to stop instance")
670 # test every 10secs for 2min
674 for dummy in range(11):
675 if instance.name not in GetInstanceList():
679 # the shutdown did not succeed
680 logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
683 hyper.StopInstance(instance, force=True)
684 except errors.HypervisorError, err:
685 logging.exception("Failed to stop instance")
689 if instance.name in GetInstanceList():
690 logging.error("could not shutdown instance '%s' even by destroy",
697 def RebootInstance(instance, reboot_type, extra_args):
698 """Reboot an instance.
701 instance - name of instance to reboot
702 reboot_type - how to reboot [soft,hard,full]
705 running_instances = GetInstanceList()
707 if instance.name not in running_instances:
708 logging.error("Cannot reboot instance that is not running")
711 hyper = hypervisor.GetHypervisor()
712 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
714 hyper.RebootInstance(instance)
715 except errors.HypervisorError, err:
716 logging.exception("Failed to soft reboot instance")
718 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
720 ShutdownInstance(instance)
721 StartInstance(instance, extra_args)
722 except errors.HypervisorError, err:
723 logging.exception("Failed to hard reboot instance")
726 raise errors.ParameterError("reboot_type invalid")
732 def MigrateInstance(instance, target, live):
733 """Migrates an instance to another node.
736 hyper = hypervisor.GetHypervisor()
739 hyper.MigrateInstance(instance, target, live)
740 except errors.HypervisorError, err:
741 msg = "Failed to migrate instance: %s" % str(err)
744 return (True, "Migration successfull")
747 def CreateBlockDevice(disk, size, owner, on_primary, info):
748 """Creates a block device for an instance.
751 disk: a ganeti.objects.Disk object
752 size: the size of the physical underlying device
753 owner: a string with the name of the instance
754 on_primary: a boolean indicating if it is the primary node or not
755 info: string that will be sent to the physical device creation
758 the new unique_id of the device (this can sometime be
759 computed only after creation), or None. On secondary nodes,
760 it's not required to return anything.
765 for child in disk.children:
766 crdev = _RecursiveAssembleBD(child, owner, on_primary)
767 if on_primary or disk.AssembleOnSecondary():
768 # we need the children open in case the device itself has to
773 device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
774 if device is not None:
775 logging.info("removing existing device %s", disk)
777 except errors.BlockDeviceError, err:
780 device = bdev.Create(disk.dev_type, disk.physical_id,
783 raise ValueError("Can't create child device for %s, %s" %
785 if on_primary or disk.AssembleOnSecondary():
786 if not device.Assemble():
787 errorstring = "Can't assemble device after creation"
788 logging.error(errorstring)
789 raise errors.BlockDeviceError("%s, very unusual event - check the node"
790 " daemon logs" % errorstring)
791 device.SetSyncSpeed(constants.SYNC_SPEED)
792 if on_primary or disk.OpenOnSecondary():
793 device.Open(force=True)
794 DevCacheManager.UpdateCache(device.dev_path, owner,
795 on_primary, disk.iv_name)
799 physical_id = device.unique_id
803 def RemoveBlockDevice(disk):
804 """Remove a block device.
806 This is intended to be called recursively.
810 # since we are removing the device, allow a partial match
811 # this allows removal of broken mirrors
812 rdev = _RecursiveFindBD(disk, allow_partial=True)
813 except errors.BlockDeviceError, err:
814 # probably can't attach
815 logging.info("Can't attach to device %s in remove", disk)
818 r_path = rdev.dev_path
819 result = rdev.Remove()
821 DevCacheManager.RemoveCache(r_path)
825 for child in disk.children:
826 result = result and RemoveBlockDevice(child)
830 def _RecursiveAssembleBD(disk, owner, as_primary):
831 """Activate a block device for an instance.
833 This is run on the primary and secondary nodes for an instance.
835 This function is called recursively.
838 disk: a objects.Disk object
839 as_primary: if we should make the block device read/write
842 the assembled device or None (in case no device was assembled)
844 If the assembly is not successful, an exception is raised.
849 mcn = disk.ChildrenNeeded()
851 mcn = 0 # max number of Nones allowed
853 mcn = len(disk.children) - mcn # max number of Nones
854 for chld_disk in disk.children:
856 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
857 except errors.BlockDeviceError, err:
858 if children.count(None) >= mcn:
861 logging.debug("Error in child activation: %s", str(err))
862 children.append(cdev)
864 if as_primary or disk.AssembleOnSecondary():
865 r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
866 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
868 if as_primary or disk.OpenOnSecondary():
870 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
871 as_primary, disk.iv_name)
878 def AssembleBlockDevice(disk, owner, as_primary):
879 """Activate a block device for an instance.
881 This is a wrapper over _RecursiveAssembleBD.
884 a /dev path for primary nodes
885 True for secondary nodes
888 result = _RecursiveAssembleBD(disk, owner, as_primary)
889 if isinstance(result, bdev.BlockDev):
890 result = result.dev_path
894 def ShutdownBlockDevice(disk):
895 """Shut down a block device.
897 First, if the device is assembled (can `Attach()`), then the device
898 is shutdown. Then the children of the device are shutdown.
900 This function is called recursively. Note that we don't cache the
901 children or such, as oppossed to assemble, shutdown of different
902 devices doesn't require that the upper device was active.
905 r_dev = _RecursiveFindBD(disk)
906 if r_dev is not None:
907 r_path = r_dev.dev_path
908 result = r_dev.Shutdown()
910 DevCacheManager.RemoveCache(r_path)
914 for child in disk.children:
915 result = result and ShutdownBlockDevice(child)
919 def MirrorAddChildren(parent_cdev, new_cdevs):
920 """Extend a mirrored block device.
923 parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
924 if parent_bdev is None:
925 logging.error("Can't find parent device")
927 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
928 if new_bdevs.count(None) > 0:
929 logging.error("Can't find new device(s) to add: %s:%s",
930 new_bdevs, new_cdevs)
932 parent_bdev.AddChildren(new_bdevs)
936 def MirrorRemoveChildren(parent_cdev, new_cdevs):
937 """Shrink a mirrored block device.
940 parent_bdev = _RecursiveFindBD(parent_cdev)
941 if parent_bdev is None:
942 logging.error("Can't find parent in remove children: %s", parent_cdev)
945 for disk in new_cdevs:
946 rpath = disk.StaticDevPath()
948 bd = _RecursiveFindBD(disk)
950 logging.error("Can't find dynamic device %s while removing children",
954 devs.append(bd.dev_path)
957 parent_bdev.RemoveChildren(devs)
961 def GetMirrorStatus(disks):
962 """Get the mirroring status of a list of devices.
965 disks: list of `objects.Disk`
968 list of (mirror_done, estimated_time) tuples, which
969 are the result of bdev.BlockDevice.CombinedSyncStatus()
974 rbd = _RecursiveFindBD(dsk)
976 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
977 stats.append(rbd.CombinedSyncStatus())
981 def _RecursiveFindBD(disk, allow_partial=False):
982 """Check if a device is activated.
984 If so, return informations about the real device.
987 disk: the objects.Disk instance
988 allow_partial: don't abort the find if a child of the
989 device can't be found; this is intended to be
990 used when repairing mirrors
993 None if the device can't be found
994 otherwise the device instance
999 for chdisk in disk.children:
1000 children.append(_RecursiveFindBD(chdisk))
1002 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1005 def FindBlockDevice(disk):
1006 """Check if a device is activated.
1008 If so, return informations about the real device.
1011 disk: the objects.Disk instance
1013 None if the device can't be found
1014 (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1017 rbd = _RecursiveFindBD(disk)
1020 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1023 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1024 """Write a file to the filesystem.
1026 This allows the master to overwrite(!) a file. It will only perform
1027 the operation if the file belongs to a list of configuration files.
1030 if not os.path.isabs(file_name):
1031 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1036 constants.CLUSTER_CONF_FILE,
1037 constants.ETC_HOSTS,
1038 constants.SSH_KNOWN_HOSTS_FILE,
1039 constants.VNC_PASSWORD_FILE,
1040 constants.JOB_QUEUE_SERIAL_FILE,
1042 allowed_files.extend(ssconf.SimpleStore().GetFileList())
1043 if file_name not in allowed_files:
1044 logging.error("Filename passed to UploadFile not in allowed"
1045 " upload targets: '%s'", file_name)
1048 utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1049 atime=atime, mtime=mtime)
1053 def _ErrnoOrStr(err):
1054 """Format an EnvironmentError exception.
1056 If the `err` argument has an errno attribute, it will be looked up
1057 and converted into a textual EXXXX description. Otherwise the string
1058 representation of the error will be returned.
1061 if hasattr(err, 'errno'):
1062 detail = errno.errorcode[err.errno]
1068 def _OSOndiskVersion(name, os_dir):
1069 """Compute and return the API version of a given OS.
1071 This function will try to read the API version of the os given by
1072 the 'name' parameter and residing in the 'os_dir' directory.
1074 Return value will be either an integer denoting the version or None in the
1075 case when this is not a valid OS name.
1078 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1081 st = os.stat(api_file)
1082 except EnvironmentError, err:
1083 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1084 " found (%s)" % _ErrnoOrStr(err))
1086 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1087 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1093 api_version = f.read(256)
1096 except EnvironmentError, err:
1097 raise errors.InvalidOS(name, os_dir, "error while reading the"
1098 " API version (%s)" % _ErrnoOrStr(err))
1100 api_version = api_version.strip()
1102 api_version = int(api_version)
1103 except (TypeError, ValueError), err:
1104 raise errors.InvalidOS(name, os_dir,
1105 "API version is not integer (%s)" % str(err))
1110 def DiagnoseOS(top_dirs=None):
1111 """Compute the validity for all OSes.
1113 Returns an OS object for each name in all the given top directories
1114 (if not given defaults to constants.OS_SEARCH_PATH)
1120 if top_dirs is None:
1121 top_dirs = constants.OS_SEARCH_PATH
1124 for dir_name in top_dirs:
1125 if os.path.isdir(dir_name):
1127 f_names = utils.ListVisibleFiles(dir_name)
1128 except EnvironmentError, err:
1129 logging.exception("Can't list the OS directory %s", dir_name)
1131 for name in f_names:
1133 os_inst = OSFromDisk(name, base_dir=dir_name)
1134 result.append(os_inst)
1135 except errors.InvalidOS, err:
1136 result.append(objects.OS.FromInvalidOS(err))
1141 def OSFromDisk(name, base_dir=None):
1142 """Create an OS instance from disk.
1144 This function will return an OS instance if the given name is a
1145 valid OS name. Otherwise, it will raise an appropriate
1146 `errors.InvalidOS` exception, detailing why this is not a valid
1150 os_dir: Directory containing the OS scripts. Defaults to a search
1151 in all the OS_SEARCH_PATH directories.
1155 if base_dir is None:
1156 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1158 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1160 os_dir = os.path.sep.join([base_dir, name])
1162 api_version = _OSOndiskVersion(name, os_dir)
1164 if api_version != constants.OS_API_VERSION:
1165 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1166 " (found %s want %s)"
1167 % (api_version, constants.OS_API_VERSION))
1169 # OS Scripts dictionary, we will populate it with the actual script names
1170 os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1172 for script in os_scripts:
1173 os_scripts[script] = os.path.sep.join([os_dir, script])
1176 st = os.stat(os_scripts[script])
1177 except EnvironmentError, err:
1178 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1179 (script, _ErrnoOrStr(err)))
1181 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1182 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1185 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1186 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1190 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1191 create_script=os_scripts['create'],
1192 export_script=os_scripts['export'],
1193 import_script=os_scripts['import'],
1194 rename_script=os_scripts['rename'],
1195 api_version=api_version)
1198 def GrowBlockDevice(disk, amount):
1199 """Grow a stack of block devices.
1201 This function is called recursively, with the childrens being the
1205 disk: the disk to be grown
1207 Returns: a tuple of (status, result), with:
1208 status: the result (true/false) of the operation
1209 result: the error message if the operation failed, otherwise not used
1212 r_dev = _RecursiveFindBD(disk)
1214 return False, "Cannot find block device %s" % (disk,)
1218 except errors.BlockDeviceError, err:
1219 return False, str(err)
1224 def SnapshotBlockDevice(disk):
1225 """Create a snapshot copy of a block device.
1227 This function is called recursively, and the snapshot is actually created
1228 just for the leaf lvm backend device.
1231 disk: the disk to be snapshotted
1234 a config entry for the actual lvm device snapshotted.
1238 if len(disk.children) == 1:
1239 # only one child, let's recurse on it
1240 return SnapshotBlockDevice(disk.children[0])
1242 # more than one child, choose one that matches
1243 for child in disk.children:
1244 if child.size == disk.size:
1245 # return implies breaking the loop
1246 return SnapshotBlockDevice(child)
1247 elif disk.dev_type == constants.LD_LV:
1248 r_dev = _RecursiveFindBD(disk)
1249 if r_dev is not None:
1250 # let's stay on the safe side and ask for the full size, for now
1251 return r_dev.Snapshot(disk.size)
1255 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1256 " '%s' of type '%s'" %
1257 (disk.unique_id, disk.dev_type))
1260 def ExportSnapshot(disk, dest_node, instance):
1261 """Export a block device snapshot to a remote node.
1264 disk: the snapshot block device
1265 dest_node: the node to send the image to
1266 instance: instance being exported
1269 True if successful, False otherwise.
1272 inst_os = OSFromDisk(instance.os)
1273 export_script = inst_os.export_script
1275 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1276 instance.name, int(time.time()))
1277 if not os.path.exists(constants.LOG_OS_DIR):
1278 os.mkdir(constants.LOG_OS_DIR, 0750)
1280 real_os_dev = _RecursiveFindBD(disk)
1281 if real_os_dev is None:
1282 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1286 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1287 destfile = disk.physical_id[1]
1289 # the target command is built out of three individual commands,
1290 # which are joined by pipes; we check each individual command for
1293 expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1294 export_script, instance.name,
1295 real_os_dev.dev_path, logfile)
1299 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1300 destdir, destdir, destfile)
1301 remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1304 # all commands have been checked, so we're safe to combine them
1305 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1307 result = utils.RunCmd(command)
1310 logging.error("os snapshot export command '%s' returned error: %s"
1311 " output: %s", command, result.fail_reason, result.output)
1317 def FinalizeExport(instance, snap_disks):
1318 """Write out the export configuration information.
1321 instance: instance configuration
1322 snap_disks: snapshot block devices
1325 False in case of error, True otherwise.
1328 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1329 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1331 config = objects.SerializableConfigParser()
1333 config.add_section(constants.INISECT_EXP)
1334 config.set(constants.INISECT_EXP, 'version', '0')
1335 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1336 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1337 config.set(constants.INISECT_EXP, 'os', instance.os)
1338 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1340 config.add_section(constants.INISECT_INS)
1341 config.set(constants.INISECT_INS, 'name', instance.name)
1342 config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1343 config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1344 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1347 for nic_count, nic in enumerate(instance.nics):
1348 config.set(constants.INISECT_INS, 'nic%d_mac' %
1349 nic_count, '%s' % nic.mac)
1350 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1351 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1353 # TODO: redundant: on load can read nics until it doesn't exist
1354 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1357 for disk_count, disk in enumerate(snap_disks):
1358 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1359 ('%s' % disk.iv_name))
1360 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1361 ('%s' % disk.physical_id[1]))
1362 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1364 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1366 cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1367 cfo = open(cff, 'w')
1373 shutil.rmtree(finaldestdir, True)
1374 shutil.move(destdir, finaldestdir)
1379 def ExportInfo(dest):
1380 """Get export configuration information.
1383 dest: directory containing the export
1386 A serializable config file containing the export info.
1389 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1391 config = objects.SerializableConfigParser()
1394 if (not config.has_section(constants.INISECT_EXP) or
1395 not config.has_section(constants.INISECT_INS)):
1401 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1402 """Import an os image into an instance.
1405 instance: the instance object
1406 os_disk: the instance-visible name of the os device
1407 swap_disk: the instance-visible name of the swap device
1408 src_node: node holding the source image
1409 src_image: path to the source image on src_node
1412 False in case of error, True otherwise.
1415 inst_os = OSFromDisk(instance.os)
1416 import_script = inst_os.import_script
1418 os_device = instance.FindDisk(os_disk)
1419 if os_device is None:
1420 logging.error("Can't find this device-visible name '%s'", os_disk)
1423 swap_device = instance.FindDisk(swap_disk)
1424 if swap_device is None:
1425 logging.error("Can't find this device-visible name '%s'", swap_disk)
1428 real_os_dev = _RecursiveFindBD(os_device)
1429 if real_os_dev is None:
1430 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1434 real_swap_dev = _RecursiveFindBD(swap_device)
1435 if real_swap_dev is None:
1436 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1438 real_swap_dev.Open()
1440 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1441 instance.name, int(time.time()))
1442 if not os.path.exists(constants.LOG_OS_DIR):
1443 os.mkdir(constants.LOG_OS_DIR, 0750)
1445 destcmd = utils.BuildShellCmd('cat %s', src_image)
1446 remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1450 impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1451 inst_os.path, import_script, instance.name,
1452 real_os_dev.dev_path, real_swap_dev.dev_path,
1455 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1457 result = utils.RunCmd(command)
1460 logging.error("os import command '%s' returned error: %s"
1461 " output: %s", command, result.fail_reason, result.output)
1468 """Return a list of exports currently available on this machine.
1471 if os.path.isdir(constants.EXPORT_DIR):
1472 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1477 def RemoveExport(export):
1478 """Remove an existing export from the node.
1481 export: the name of the export to remove
1484 False in case of error, True otherwise.
1487 target = os.path.join(constants.EXPORT_DIR, export)
1489 shutil.rmtree(target)
1490 # TODO: catch some of the relevant exceptions and provide a pretty
1491 # error message if rmtree fails.
1496 def RenameBlockDevices(devlist):
1497 """Rename a list of block devices.
1499 The devlist argument is a list of tuples (disk, new_logical,
1500 new_physical). The return value will be a combined boolean result
1501 (True only if all renames succeeded).
1505 for disk, unique_id in devlist:
1506 dev = _RecursiveFindBD(disk)
1511 old_rpath = dev.dev_path
1512 dev.Rename(unique_id)
1513 new_rpath = dev.dev_path
1514 if old_rpath != new_rpath:
1515 DevCacheManager.RemoveCache(old_rpath)
1516 # FIXME: we should add the new cache information here, like:
1517 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1518 # but we don't have the owner here - maybe parse from existing
1519 # cache? for now, we only lose lvm data when we rename, which
1520 # is less critical than DRBD or MD
1521 except errors.BlockDeviceError, err:
1522 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1527 def _TransformFileStorageDir(file_storage_dir):
1528 """Checks whether given file_storage_dir is valid.
1530 Checks wheter the given file_storage_dir is within the cluster-wide
1531 default file_storage_dir stored in SimpleStore. Only paths under that
1532 directory are allowed.
1535 file_storage_dir: string with path
1538 normalized file_storage_dir (string) if valid, None otherwise
1541 file_storage_dir = os.path.normpath(file_storage_dir)
1542 base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1543 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1544 base_file_storage_dir):
1545 logging.error("file storage directory '%s' is not under base file"
1546 " storage directory '%s'",
1547 file_storage_dir, base_file_storage_dir)
1549 return file_storage_dir
1552 def CreateFileStorageDir(file_storage_dir):
1553 """Create file storage directory.
1556 file_storage_dir: string containing the path
1559 tuple with first element a boolean indicating wheter dir
1560 creation was successful or not
1563 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1565 if not file_storage_dir:
1568 if os.path.exists(file_storage_dir):
1569 if not os.path.isdir(file_storage_dir):
1570 logging.error("'%s' is not a directory", file_storage_dir)
1574 os.makedirs(file_storage_dir, 0750)
1575 except OSError, err:
1576 logging.error("Cannot create file storage directory '%s': %s",
1577 file_storage_dir, err)
1582 def RemoveFileStorageDir(file_storage_dir):
1583 """Remove file storage directory.
1585 Remove it only if it's empty. If not log an error and return.
1588 file_storage_dir: string containing the path
1591 tuple with first element a boolean indicating wheter dir
1592 removal was successful or not
1595 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1597 if not file_storage_dir:
1600 if os.path.exists(file_storage_dir):
1601 if not os.path.isdir(file_storage_dir):
1602 logging.error("'%s' is not a directory", file_storage_dir)
1604 # deletes dir only if empty, otherwise we want to return False
1606 os.rmdir(file_storage_dir)
1607 except OSError, err:
1608 logging.exception("Cannot remove file storage directory '%s'",
1614 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1615 """Rename the file storage directory.
1618 old_file_storage_dir: string containing the old path
1619 new_file_storage_dir: string containing the new path
1622 tuple with first element a boolean indicating wheter dir
1623 rename was successful or not
1626 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1627 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1629 if not old_file_storage_dir or not new_file_storage_dir:
1632 if not os.path.exists(new_file_storage_dir):
1633 if os.path.isdir(old_file_storage_dir):
1635 os.rename(old_file_storage_dir, new_file_storage_dir)
1636 except OSError, err:
1637 logging.exception("Cannot rename '%s' to '%s'",
1638 old_file_storage_dir, new_file_storage_dir)
1641 logging.error("'%s' is not a directory", old_file_storage_dir)
1644 if os.path.exists(old_file_storage_dir):
1645 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1646 old_file_storage_dir, new_file_storage_dir)
1651 def CloseBlockDevices(disks):
1652 """Closes the given block devices.
1654 This means they will be switched to secondary mode (in case of DRBD).
1659 rd = _RecursiveFindBD(cf)
1661 return (False, "Can't find device %s" % cf)
1668 except errors.BlockDeviceError, err:
1669 msg.append(str(err))
1671 return (False, "Can't make devices secondary: %s" % ",".join(msg))
1673 return (True, "All devices secondary")
1676 class HooksRunner(object):
1679 This class is instantiated on the node side (ganeti-noded) and not on
1683 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1685 def __init__(self, hooks_base_dir=None):
1686 """Constructor for hooks runner.
1689 - hooks_base_dir: if not None, this overrides the
1690 constants.HOOKS_BASE_DIR (useful for unittests)
1693 if hooks_base_dir is None:
1694 hooks_base_dir = constants.HOOKS_BASE_DIR
1695 self._BASE_DIR = hooks_base_dir
1698 def ExecHook(script, env):
1699 """Exec one hook script.
1702 - script: the full path to the script
1703 - env: the environment with which to exec the script
1706 # exec the process using subprocess and log the output
1709 fdstdin = open("/dev/null", "r")
1710 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1711 stderr=subprocess.STDOUT, close_fds=True,
1712 shell=False, cwd="/", env=env)
1715 output = child.stdout.read(4096)
1716 child.stdout.close()
1717 except EnvironmentError, err:
1718 output += "Hook script error: %s" % str(err)
1722 result = child.wait()
1724 except EnvironmentError, err:
1725 if err.errno == errno.EINTR:
1729 # try not to leak fds
1730 for fd in (fdstdin, ):
1734 except EnvironmentError, err:
1735 # just log the error
1736 #logging.exception("Error while closing fd %s", fd)
1739 return result == 0, output
1741 def RunHooks(self, hpath, phase, env):
1742 """Run the scripts in the hooks directory.
1744 This method will not be usually overriden by child opcodes.
1747 if phase == constants.HOOKS_PHASE_PRE:
1749 elif phase == constants.HOOKS_PHASE_POST:
1752 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1755 subdir = "%s-%s.d" % (hpath, suffix)
1756 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1758 dir_contents = utils.ListVisibleFiles(dir_name)
1759 except OSError, err:
1763 # we use the standard python sort order,
1764 # so 00name is the recommended naming scheme
1766 for relname in dir_contents:
1767 fname = os.path.join(dir_name, relname)
1768 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1769 self.RE_MASK.match(relname) is not None):
1770 rrval = constants.HKR_SKIP
1773 result, output = self.ExecHook(fname, env)
1775 rrval = constants.HKR_FAIL
1777 rrval = constants.HKR_SUCCESS
1778 rr.append(("%s/%s" % (subdir, relname), rrval, output))
1783 class IAllocatorRunner(object):
1784 """IAllocator runner.
1786 This class is instantiated on the node side (ganeti-noded) and not on
1790 def Run(self, name, idata):
1791 """Run an iallocator script.
1793 Return value: tuple of:
1794 - run status (one of the IARUN_ constants)
1797 - fail reason (as from utils.RunResult)
1800 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1802 if alloc_script is None:
1803 return (constants.IARUN_NOTFOUND, None, None, None)
1805 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1809 result = utils.RunCmd([alloc_script, fin_name])
1811 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1816 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1819 class DevCacheManager(object):
1820 """Simple class for managing a cache of block device information.
1823 _DEV_PREFIX = "/dev/"
1824 _ROOT_DIR = constants.BDEV_CACHE_DIR
1827 def _ConvertPath(cls, dev_path):
1828 """Converts a /dev/name path to the cache file name.
1830 This replaces slashes with underscores and strips the /dev
1831 prefix. It then returns the full path to the cache file
1834 if dev_path.startswith(cls._DEV_PREFIX):
1835 dev_path = dev_path[len(cls._DEV_PREFIX):]
1836 dev_path = dev_path.replace("/", "_")
1837 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1841 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1842 """Updates the cache information for a given device.
1845 if dev_path is None:
1846 logging.error("DevCacheManager.UpdateCache got a None dev_path")
1848 fpath = cls._ConvertPath(dev_path)
1854 iv_name = "not_visible"
1855 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1857 utils.WriteFile(fpath, data=fdata)
1858 except EnvironmentError, err:
1859 logging.exception("Can't update bdev cache for %s", dev_path)
1862 def RemoveCache(cls, dev_path):
1863 """Remove data for a dev_path.
1866 if dev_path is None:
1867 logging.error("DevCacheManager.RemoveCache got a None dev_path")
1869 fpath = cls._ConvertPath(dev_path)
1871 utils.RemoveFile(fpath)
1872 except EnvironmentError, err:
1873 logging.exception("Can't update bdev cache for %s", dev_path)