4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
50 """Simple wrapper to return a SimpleStore.
52 @rtype: L{ssconf.SimpleStore}
53 @return: a SimpleStore instance
56 return ssconf.SimpleStore()
59 def _GetSshRunner(cluster_name):
60 """Simple wrapper to return an SshRunner.
62 @type cluster_name: str
63 @param cluster_name: the cluster name, which is needed
64 by the SshRunner constructor
65 @rtype: L{ssh.SshRunner}
66 @return: an SshRunner instance
69 return ssh.SshRunner(cluster_name)
72 def _Decompress(data):
73 """Unpacks data compressed by the RPC client.
75 @type data: list or tuple
76 @param data: Data sent by RPC client
78 @return: Decompressed data
81 assert isinstance(data, (list, tuple))
83 (encoding, content) = data
84 if encoding == constants.RPC_ENCODING_NONE:
86 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87 return zlib.decompress(base64.b64decode(content))
89 raise AssertionError("Unknown data encoding")
92 def _CleanDirectory(path, exclude=None):
93 """Removes all regular files in a directory.
96 @param path: the directory to clean
98 @param exclude: list of files to be excluded, defaults
102 if not os.path.isdir(path):
107 # Normalize excluded paths
108 exclude = [os.path.normpath(i) for i in exclude]
110 for rel_name in utils.ListVisibleFiles(path):
111 full_name = os.path.normpath(os.path.join(path, rel_name))
112 if full_name in exclude:
114 if os.path.isfile(full_name) and not os.path.islink(full_name):
115 utils.RemoveFile(full_name)
119 """Removes job queue files and archived jobs.
124 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
129 """Returns master information.
131 This is an utility function to compute master information, either
132 for consumption here or from the node daemon.
135 @return: (master_netdev, master_ip, master_name) if we have a good
136 configuration, otherwise (None, None, None)
141 master_netdev = cfg.GetMasterNetdev()
142 master_ip = cfg.GetMasterIP()
143 master_node = cfg.GetMasterNode()
144 except errors.ConfigurationError, err:
145 logging.exception("Cluster configuration incomplete")
146 return (None, None, None)
147 return (master_netdev, master_ip, master_node)
150 def StartMaster(start_daemons):
151 """Activate local node as master node.
153 The function will always try activate the IP address of the master
154 (unless someone else has it). It will also start the master daemons,
155 based on the start_daemons parameter.
157 @type start_daemons: boolean
158 @param start_daemons: whther to also start the master
159 daemons (ganeti-masterd and ganeti-rapi)
164 master_netdev, master_ip, _ = GetMasterInfo()
165 if not master_netdev:
168 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169 if utils.OwnIpAddress(master_ip):
170 # we already have the ip:
171 logging.debug("Already started")
173 logging.error("Someone else has the master ip, not activating")
176 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177 "dev", master_netdev, "label",
178 "%s:0" % master_netdev])
180 logging.error("Can't activate master IP: %s", result.output)
183 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184 "-s", master_ip, master_ip])
185 # we'll ignore the exit code of arping
187 # and now start the master and rapi daemons
189 for daemon in 'ganeti-masterd', 'ganeti-rapi':
190 result = utils.RunCmd([daemon])
192 logging.error("Can't start daemon %s: %s", daemon, result.output)
197 def StopMaster(stop_daemons):
198 """Deactivate this node as master.
200 The function will always try to deactivate the IP address of the
201 master. It will also stop the master daemons depending on the
202 stop_daemons parameter.
204 @type stop_daemons: boolean
205 @param stop_daemons: whether to also stop the master daemons
206 (ganeti-masterd and ganeti-rapi)
210 master_netdev, master_ip, _ = GetMasterInfo()
211 if not master_netdev:
214 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215 "dev", master_netdev])
217 logging.error("Can't remove the master IP, error: %s", result.output)
218 # but otherwise ignore the failure
221 # stop/kill the rapi and the master daemon
222 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229 """Joins this node to the cluster.
231 This does the following:
232 - updates the hostkeys of the machine (rsa and dsa)
233 - adds the ssh private key to the user
234 - adds the ssh public key to the users' authorized_keys file
237 @param dsa: the DSA private key to write
239 @param dsapub: the DSA public key to write
241 @param rsa: the RSA private key to write
243 @param rsapub: the RSA public key to write
245 @param sshkey: the SSH private key to write
247 @param sshpub: the SSH public key to write
249 @return: the success of the operation
252 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256 for name, content, mode in sshd_keys:
257 utils.WriteFile(name, data=content, mode=mode)
260 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
262 except errors.OpExecError, err:
263 logging.exception("Error while processing user ssh files")
266 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267 utils.WriteFile(name, data=content, mode=0600)
269 utils.AddAuthorizedKey(auth_keys, sshpub)
271 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
277 """Cleans up and remove the current node.
279 This function cleans up and prepares the current node to be removed
282 If processing is successful, then it raises an
283 L{errors.QuitGanetiException} which is used as a special case to
284 shutdown the node daemon.
287 _CleanDirectory(constants.DATA_DIR)
291 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292 except errors.OpExecError:
293 logging.exception("Error while processing ssh files")
296 f = open(pub_key, 'r')
298 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
302 utils.RemoveFile(priv_key)
303 utils.RemoveFile(pub_key)
305 # Return a reassuring string to the caller, and quit
306 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
309 def GetNodeInfo(vgname, hypervisor_type):
310 """Gives back a hash with different informations about the node.
312 @type vgname: C{string}
313 @param vgname: the name of the volume group to ask for disk space information
314 @type hypervisor_type: C{str}
315 @param hypervisor_type: the name of the hypervisor to ask for
318 @return: dictionary with the following keys:
319 - vg_size is the size of the configured volume group in MiB
320 - vg_free is the free size of the volume group in MiB
321 - memory_dom0 is the memory allocated for domain0 in MiB
322 - memory_free is the currently available (free) ram in MiB
323 - memory_total is the total number of ram in MiB
327 vginfo = _GetVGInfo(vgname)
328 outputarray['vg_size'] = vginfo['vg_size']
329 outputarray['vg_free'] = vginfo['vg_free']
331 hyper = hypervisor.GetHypervisor(hypervisor_type)
332 hyp_info = hyper.GetNodeInfo()
333 if hyp_info is not None:
334 outputarray.update(hyp_info)
336 f = open("/proc/sys/kernel/random/boot_id", 'r')
338 outputarray["bootid"] = f.read(128).rstrip("\n")
345 def VerifyNode(what, cluster_name):
346 """Verify the status of the local node.
348 Based on the input L{what} parameter, various checks are done on the
351 If the I{filelist} key is present, this list of
352 files is checksummed and the file/checksum pairs are returned.
354 If the I{nodelist} key is present, we check that we have
355 connectivity via ssh with the target nodes (and check the hostname
358 If the I{node-net-test} key is present, we check that we have
359 connectivity to the given nodes via both primary IP and, if
360 applicable, secondary IPs.
363 @param what: a dictionary of things to check:
364 - filelist: list of files for which to compute checksums
365 - nodelist: list of nodes we should check ssh communication with
366 - node-net-test: list of nodes we should check node daemon port
368 - hypervisor: list with hypervisors to run the verify for
370 @return: a dictionary with the same keys as the input dict, and
371 values representing the result of the checks
376 if constants.NV_HYPERVISOR in what:
377 result[constants.NV_HYPERVISOR] = tmp = {}
378 for hv_name in what[constants.NV_HYPERVISOR]:
379 tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381 if constants.NV_FILELIST in what:
382 result[constants.NV_FILELIST] = utils.FingerprintFiles(
383 what[constants.NV_FILELIST])
385 if constants.NV_NODELIST in what:
386 result[constants.NV_NODELIST] = tmp = {}
387 random.shuffle(what[constants.NV_NODELIST])
388 for node in what[constants.NV_NODELIST]:
389 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
393 if constants.NV_NODENETTEST in what:
394 result[constants.NV_NODENETTEST] = tmp = {}
395 my_name = utils.HostInfo().name
396 my_pip = my_sip = None
397 for name, pip, sip in what[constants.NV_NODENETTEST]:
403 tmp[my_name] = ("Can't find my own primary/secondary IP"
406 port = utils.GetNodeDaemonPort()
407 for name, pip, sip in what[constants.NV_NODENETTEST]:
409 if not utils.TcpPing(pip, port, source=my_pip):
410 fail.append("primary")
412 if not utils.TcpPing(sip, port, source=my_sip):
413 fail.append("secondary")
415 tmp[name] = ("failure using the %s interface(s)" %
418 if constants.NV_LVLIST in what:
419 result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421 if constants.NV_INSTANCELIST in what:
422 result[constants.NV_INSTANCELIST] = GetInstanceList(
423 what[constants.NV_INSTANCELIST])
425 if constants.NV_VGLIST in what:
426 result[constants.NV_VGLIST] = ListVolumeGroups()
428 if constants.NV_VERSION in what:
429 result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
431 if constants.NV_HVINFO in what:
432 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
438 def GetVolumeList(vg_name):
439 """Compute list of logical volumes and their size.
442 @param vg_name: the volume group whose LVs we should list
445 dictionary of all partions (key) with value being a tuple of
446 their size (in MiB), inactive and online status::
448 {'test1': ('20.06', True, True)}
450 in case of errors, a string is returned with the error
456 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
457 "--separator=%s" % sep,
458 "-olv_name,lv_size,lv_attr", vg_name])
460 logging.error("Failed to list logical volumes, lvs output: %s",
464 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
465 for line in result.stdout.splitlines():
467 match = valid_line_re.match(line)
469 logging.error("Invalid line returned from lvs output: '%s'", line)
471 name, size, attr = match.groups()
472 inactive = attr[4] == '-'
473 online = attr[5] == 'o'
474 lvs[name] = (size, inactive, online)
479 def ListVolumeGroups():
480 """List the volume groups and their size.
483 @return: dictionary with keys volume name and values the
487 return utils.ListVolumeGroups()
491 """List all volumes on this node.
495 A list of dictionaries, each having four keys:
496 - name: the logical volume name,
497 - size: the size of the logical volume
498 - dev: the physical device on which the LV lives
499 - vg: the volume group to which it belongs
501 In case of errors, we return an empty list and log the
504 Note that since a logical volume can live on multiple physical
505 volumes, the resulting list might include a logical volume
509 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
511 "--options=lv_name,lv_size,devices,vg_name"])
513 logging.error("Failed to list logical volumes, lvs output: %s",
519 return dev.split('(')[0]
525 'name': line[0].strip(),
526 'size': line[1].strip(),
527 'dev': parse_dev(line[2].strip()),
528 'vg': line[3].strip(),
531 return [map_line(line.split('|')) for line in result.stdout.splitlines()
532 if line.count('|') >= 3]
535 def BridgesExist(bridges_list):
536 """Check if a list of bridges exist on the current node.
539 @return: C{True} if all of them exist, C{False} otherwise
542 for bridge in bridges_list:
543 if not utils.BridgeExists(bridge):
549 def GetInstanceList(hypervisor_list):
550 """Provides a list of instances.
552 @type hypervisor_list: list
553 @param hypervisor_list: the list of hypervisors to query information
556 @return: a list of all running instances on the current node
557 - instance1.example.com
558 - instance2.example.com
562 for hname in hypervisor_list:
564 names = hypervisor.GetHypervisor(hname).ListInstances()
565 results.extend(names)
566 except errors.HypervisorError, err:
567 logging.exception("Error enumerating instances for hypevisor %s", hname)
568 # FIXME: should we somehow not propagate this to the master?
574 def GetInstanceInfo(instance, hname):
575 """Gives back the informations about an instance as a dictionary.
577 @type instance: string
578 @param instance: the instance name
580 @param hname: the hypervisor type of the instance
583 @return: dictionary with the following keys:
584 - memory: memory size of instance (int)
585 - state: xen state of instance (string)
586 - time: cpu time of instance (float)
591 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
592 if iinfo is not None:
593 output['memory'] = iinfo[2]
594 output['state'] = iinfo[4]
595 output['time'] = iinfo[5]
600 def GetAllInstancesInfo(hypervisor_list):
601 """Gather data about all instances.
603 This is the equivalent of L{GetInstanceInfo}, except that it
604 computes data for all instances at once, thus being faster if one
605 needs data about more than one instance.
607 @type hypervisor_list: list
608 @param hypervisor_list: list of hypervisors to query for instance data
611 @return: dictionary of instance: data, with data having the following keys:
612 - memory: memory size of instance (int)
613 - state: xen state of instance (string)
614 - time: cpu time of instance (float)
615 - vcpus: the number of vcpus
620 for hname in hypervisor_list:
621 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
623 for name, inst_id, memory, vcpus, state, times in iinfo:
630 if name in output and output[name] != value:
631 raise errors.HypervisorError("Instance %s running duplicate"
632 " with different parameters" % name)
638 def AddOSToInstance(instance):
639 """Add an OS to an instance.
641 @type instance: L{objects.Instance}
642 @param instance: Instance whose OS is to be installed
644 @return: the success of the operation
647 inst_os = OSFromDisk(instance.os)
649 create_env = OSEnvironment(instance)
651 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
652 instance.name, int(time.time()))
654 result = utils.RunCmd([inst_os.create_script], env=create_env,
655 cwd=inst_os.path, output=logfile,)
657 logging.error("os create command '%s' returned error: %s, logfile: %s,"
658 " output: %s", result.cmd, result.fail_reason, logfile,
665 def RunRenameInstance(instance, old_name):
666 """Run the OS rename script for an instance.
668 @type instance: L{objects.Instance}
669 @param instance: Instance whose OS is to be installed
670 @type old_name: string
671 @param old_name: previous instance name
673 @return: the success of the operation
676 inst_os = OSFromDisk(instance.os)
678 rename_env = OSEnvironment(instance)
679 rename_env['OLD_INSTANCE_NAME'] = old_name
681 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
683 instance.name, int(time.time()))
685 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
686 cwd=inst_os.path, output=logfile)
689 logging.error("os create command '%s' returned error: %s output: %s",
690 result.cmd, result.fail_reason, result.output)
696 def _GetVGInfo(vg_name):
697 """Get informations about the volume group.
700 @param vg_name: the volume group which we query
703 A dictionary with the following keys:
704 - C{vg_size} is the total size of the volume group in MiB
705 - C{vg_free} is the free size of the volume group in MiB
706 - C{pv_count} are the number of physical disks in that VG
708 If an error occurs during gathering of data, we return the same dict
709 with keys all set to None.
712 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
714 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
715 "--nosuffix", "--units=m", "--separator=:", vg_name])
718 logging.error("volume group %s not present", vg_name)
720 valarr = retval.stdout.strip().rstrip(':').split(':')
724 "vg_size": int(round(float(valarr[0]), 0)),
725 "vg_free": int(round(float(valarr[1]), 0)),
726 "pv_count": int(valarr[2]),
728 except ValueError, err:
729 logging.exception("Fail to parse vgs output")
731 logging.error("vgs output has the wrong number of fields (expected"
732 " three): %s", str(valarr))
736 def _SymlinkBlockDev(instance_name, device_path, device_name):
737 """Set up symlinks to a instance's block device.
739 This is an auxiliary function run when an instance is start (on the primary
740 node) or when an instance is migrated (on the target node).
743 instance_name: the name of the target instance
744 device_path: path of the physical block device, on the node
745 device_name: 'virtual' name of the device
748 absolute path to the disk's symlink
751 link_basename = "%s-%s" % (instance_name, device_name)
752 link_name = os.path.join(constants.DISK_LINKS_DIR, link_basename)
754 os.symlink(device_path, link_name)
756 if e.errno == errno.EEXIST:
757 if (not os.path.islink(link_name) or
758 os.readlink(link_name) != device_path):
760 os.symlink(device_path, link_name)
767 def _RemoveBlockDevLinks(instance_name):
768 """Remove the block device symlinks belonging to the given instance.
771 for short_name in os.listdir(constants.DISK_LINKS_DIR):
772 link_name = os.path.join(constants.DISK_LINKS_DIR, short_name)
773 if (os.path.islink(link_name) and
774 short_name.startswith('%s-' % instance_name)):
778 logging.exception("Can't remove symlink '%s'", link_name)
781 def _GatherAndLinkBlockDevs(instance):
782 """Set up an instance's block device(s).
784 This is run on the primary node at instance startup. The block
785 devices must be already assembled.
787 @type instance: L{objects.Instance}
788 @param instance: the instance whose disks we shoul assemble
790 @return: list of (disk_object, device_path)
794 for idx, disk in enumerate(instance.disks):
795 device = _RecursiveFindBD(disk)
797 raise errors.BlockDeviceError("Block device '%s' is not set up." %
801 link_name = _SymlinkBlockDev(instance.name, device.dev_path,
804 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
807 block_devices.append((disk, link_name))
812 def StartInstance(instance, extra_args):
813 """Start an instance.
815 @type instance: L{objects.Instance}
816 @param instance: the instance object
818 @return: whether the startup was successful or not
821 running_instances = GetInstanceList([instance.hypervisor])
823 if instance.name in running_instances:
827 block_devices = _GatherAndLinkBlockDevs(instance)
828 hyper = hypervisor.GetHypervisor(instance.hypervisor)
829 hyper.StartInstance(instance, block_devices, extra_args)
830 except errors.BlockDeviceError, err:
831 logging.exception("Failed to start instance")
833 except errors.HypervisorError, err:
834 logging.exception("Failed to start instance")
835 _RemoveBlockDevLinks(instance.name)
841 def ShutdownInstance(instance):
842 """Shut an instance down.
844 @note: this functions uses polling with a hardcoded timeout.
846 @type instance: L{objects.Instance}
847 @param instance: the instance object
849 @return: whether the startup was successful or not
852 hv_name = instance.hypervisor
853 running_instances = GetInstanceList([hv_name])
855 if instance.name not in running_instances:
858 hyper = hypervisor.GetHypervisor(hv_name)
860 hyper.StopInstance(instance)
861 except errors.HypervisorError, err:
862 logging.error("Failed to stop instance")
865 # test every 10secs for 2min
868 for dummy in range(11):
869 if instance.name not in GetInstanceList([hv_name]):
873 # the shutdown did not succeed
874 logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
877 hyper.StopInstance(instance, force=True)
878 except errors.HypervisorError, err:
879 logging.exception("Failed to stop instance")
883 if instance.name in GetInstanceList([hv_name]):
884 logging.error("could not shutdown instance '%s' even by destroy",
888 _RemoveBlockDevLinks(instance.name)
893 def RebootInstance(instance, reboot_type, extra_args):
894 """Reboot an instance.
896 @type instance: L{objects.Instance}
897 @param instance: the instance object to reboot
898 @type reboot_type: str
899 @param reboot_type: the type of reboot, one the following
901 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
902 instance OS, do not recreate the VM
903 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
904 restart the VM (at the hypervisor level)
905 - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
906 is not accepted here, since that mode is handled
909 @return: the success of the operation
912 running_instances = GetInstanceList([instance.hypervisor])
914 if instance.name not in running_instances:
915 logging.error("Cannot reboot instance that is not running")
918 hyper = hypervisor.GetHypervisor(instance.hypervisor)
919 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
921 hyper.RebootInstance(instance)
922 except errors.HypervisorError, err:
923 logging.exception("Failed to soft reboot instance")
925 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
927 ShutdownInstance(instance)
928 StartInstance(instance, extra_args)
929 except errors.HypervisorError, err:
930 logging.exception("Failed to hard reboot instance")
933 raise errors.ParameterError("reboot_type invalid")
938 def MigrateInstance(instance, target, live):
939 """Migrates an instance to another node.
941 @type instance: L{objects.Instance}
942 @param instance: the instance definition
944 @param target: the target node name
946 @param live: whether the migration should be done live or not (the
947 interpretation of this parameter is left to the hypervisor)
949 @return: a tuple of (success, msg) where:
950 - succes is a boolean denoting the success/failure of the operation
951 - msg is a string with details in case of failure
954 hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
957 hyper.MigrateInstance(instance.name, target, live)
958 except errors.HypervisorError, err:
959 msg = "Failed to migrate instance: %s" % str(err)
962 return (True, "Migration successfull")
965 def CreateBlockDevice(disk, size, owner, on_primary, info):
966 """Creates a block device for an instance.
968 @type disk: L{objects.Disk}
969 @param disk: the object describing the disk we should create
971 @param size: the size of the physical underlying device, in MiB
973 @param owner: the name of the instance for which disk is created,
974 used for device cache data
975 @type on_primary: boolean
976 @param on_primary: indicates if it is the primary node or not
978 @param info: string that will be sent to the physical device
979 creation, used for example to set (LVM) tags on LVs
981 @return: the new unique_id of the device (this can sometime be
982 computed only after creation), or None. On secondary nodes,
983 it's not required to return anything.
988 for child in disk.children:
989 crdev = _RecursiveAssembleBD(child, owner, on_primary)
990 if on_primary or disk.AssembleOnSecondary():
991 # we need the children open in case the device itself has to
996 device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
997 if device is not None:
998 logging.info("removing existing device %s", disk)
1000 except errors.BlockDeviceError, err:
1003 device = bdev.Create(disk.dev_type, disk.physical_id,
1006 raise ValueError("Can't create child device for %s, %s" %
1008 if on_primary or disk.AssembleOnSecondary():
1009 if not device.Assemble():
1010 errorstring = "Can't assemble device after creation"
1011 logging.error(errorstring)
1012 raise errors.BlockDeviceError("%s, very unusual event - check the node"
1013 " daemon logs" % errorstring)
1014 device.SetSyncSpeed(constants.SYNC_SPEED)
1015 if on_primary or disk.OpenOnSecondary():
1016 device.Open(force=True)
1017 DevCacheManager.UpdateCache(device.dev_path, owner,
1018 on_primary, disk.iv_name)
1020 device.SetInfo(info)
1022 physical_id = device.unique_id
1026 def RemoveBlockDevice(disk):
1027 """Remove a block device.
1029 @note: This is intended to be called recursively.
1031 @type disk: L{objects.Disk}
1032 @param disk: the disk object we should remove
1034 @return: the success of the operation
1038 rdev = _RecursiveFindBD(disk)
1039 except errors.BlockDeviceError, err:
1040 # probably can't attach
1041 logging.info("Can't attach to device %s in remove", disk)
1043 if rdev is not None:
1044 r_path = rdev.dev_path
1045 result = rdev.Remove()
1047 DevCacheManager.RemoveCache(r_path)
1051 for child in disk.children:
1052 result = result and RemoveBlockDevice(child)
1056 def _RecursiveAssembleBD(disk, owner, as_primary):
1057 """Activate a block device for an instance.
1059 This is run on the primary and secondary nodes for an instance.
1061 @note: this function is called recursively.
1063 @type disk: L{objects.Disk}
1064 @param disk: the disk we try to assemble
1066 @param owner: the name of the instance which owns the disk
1067 @type as_primary: boolean
1068 @param as_primary: if we should make the block device
1071 @return: the assembled device or None (in case no device
1073 @raise errors.BlockDeviceError: in case there is an error
1074 during the activation of the children or the device
1080 mcn = disk.ChildrenNeeded()
1082 mcn = 0 # max number of Nones allowed
1084 mcn = len(disk.children) - mcn # max number of Nones
1085 for chld_disk in disk.children:
1087 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1088 except errors.BlockDeviceError, err:
1089 if children.count(None) >= mcn:
1092 logging.debug("Error in child activation: %s", str(err))
1093 children.append(cdev)
1095 if as_primary or disk.AssembleOnSecondary():
1096 r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
1097 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1099 if as_primary or disk.OpenOnSecondary():
1101 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1102 as_primary, disk.iv_name)
1109 def AssembleBlockDevice(disk, owner, as_primary):
1110 """Activate a block device for an instance.
1112 This is a wrapper over _RecursiveAssembleBD.
1114 @rtype: str or boolean
1115 @return: a C{/dev/...} path for primary nodes, and
1116 C{True} for secondary nodes
1119 result = _RecursiveAssembleBD(disk, owner, as_primary)
1120 if isinstance(result, bdev.BlockDev):
1121 result = result.dev_path
1125 def ShutdownBlockDevice(disk):
1126 """Shut down a block device.
1128 First, if the device is assembled (Attach() is successfull), then
1129 the device is shutdown. Then the children of the device are
1132 This function is called recursively. Note that we don't cache the
1133 children or such, as oppossed to assemble, shutdown of different
1134 devices doesn't require that the upper device was active.
1136 @type disk: L{objects.Disk}
1137 @param disk: the description of the disk we should
1140 @return: the success of the operation
1143 r_dev = _RecursiveFindBD(disk)
1144 if r_dev is not None:
1145 r_path = r_dev.dev_path
1146 result = r_dev.Shutdown()
1148 DevCacheManager.RemoveCache(r_path)
1152 for child in disk.children:
1153 result = result and ShutdownBlockDevice(child)
1157 def MirrorAddChildren(parent_cdev, new_cdevs):
1158 """Extend a mirrored block device.
1160 @type parent_cdev: L{objects.Disk}
1161 @param parent_cdev: the disk to which we should add children
1162 @type new_cdevs: list of L{objects.Disk}
1163 @param new_cdevs: the list of children which we should add
1165 @return: the success of the operation
1168 parent_bdev = _RecursiveFindBD(parent_cdev)
1169 if parent_bdev is None:
1170 logging.error("Can't find parent device")
1172 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1173 if new_bdevs.count(None) > 0:
1174 logging.error("Can't find new device(s) to add: %s:%s",
1175 new_bdevs, new_cdevs)
1177 parent_bdev.AddChildren(new_bdevs)
1181 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1182 """Shrink a mirrored block device.
1184 @type parent_cdev: L{objects.Disk}
1185 @param parent_cdev: the disk from which we should remove children
1186 @type new_cdevs: list of L{objects.Disk}
1187 @param new_cdevs: the list of children which we should remove
1189 @return: the success of the operation
1192 parent_bdev = _RecursiveFindBD(parent_cdev)
1193 if parent_bdev is None:
1194 logging.error("Can't find parent in remove children: %s", parent_cdev)
1197 for disk in new_cdevs:
1198 rpath = disk.StaticDevPath()
1200 bd = _RecursiveFindBD(disk)
1202 logging.error("Can't find dynamic device %s while removing children",
1206 devs.append(bd.dev_path)
1209 parent_bdev.RemoveChildren(devs)
1213 def GetMirrorStatus(disks):
1214 """Get the mirroring status of a list of devices.
1216 @type disks: list of L{objects.Disk}
1217 @param disks: the list of disks which we should query
1220 a list of (mirror_done, estimated_time) tuples, which
1221 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1222 @raise errors.BlockDeviceError: if any of the disks cannot be
1228 rbd = _RecursiveFindBD(dsk)
1230 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1231 stats.append(rbd.CombinedSyncStatus())
1235 def _RecursiveFindBD(disk):
1236 """Check if a device is activated.
1238 If so, return informations about the real device.
1240 @type disk: L{objects.Disk}
1241 @param disk: the disk object we need to find
1243 @return: None if the device can't be found,
1244 otherwise the device instance
1249 for chdisk in disk.children:
1250 children.append(_RecursiveFindBD(chdisk))
1252 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1255 def FindBlockDevice(disk):
1256 """Check if a device is activated.
1258 If it is, return informations about the real device.
1260 @type disk: L{objects.Disk}
1261 @param disk: the disk to find
1262 @rtype: None or tuple
1263 @return: None if the disk cannot be found, otherwise a
1264 tuple (device_path, major, minor, sync_percent,
1265 estimated_time, is_degraded)
1268 rbd = _RecursiveFindBD(disk)
1271 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1274 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1275 """Write a file to the filesystem.
1277 This allows the master to overwrite(!) a file. It will only perform
1278 the operation if the file belongs to a list of configuration files.
1280 @type file_name: str
1281 @param file_name: the target file name
1283 @param data: the new contents of the file
1285 @param mode: the mode to give the file (can be None)
1287 @param uid: the owner of the file (can be -1 for default)
1289 @param gid: the group of the file (can be -1 for default)
1291 @param atime: the atime to set on the file (can be None)
1293 @param mtime: the mtime to set on the file (can be None)
1295 @return: the success of the operation; errors are logged
1296 in the node daemon log
1299 if not os.path.isabs(file_name):
1300 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1305 constants.CLUSTER_CONF_FILE,
1306 constants.ETC_HOSTS,
1307 constants.SSH_KNOWN_HOSTS_FILE,
1308 constants.VNC_PASSWORD_FILE,
1311 if file_name not in allowed_files:
1312 logging.error("Filename passed to UploadFile not in allowed"
1313 " upload targets: '%s'", file_name)
1316 raw_data = _Decompress(data)
1318 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1319 atime=atime, mtime=mtime)
1323 def WriteSsconfFiles(values):
1324 """Update all ssconf files.
1326 Wrapper around the SimpleStore.WriteFiles.
1329 ssconf.SimpleStore().WriteFiles(values)
1332 def _ErrnoOrStr(err):
1333 """Format an EnvironmentError exception.
1335 If the L{err} argument has an errno attribute, it will be looked up
1336 and converted into a textual C{E...} description. Otherwise the
1337 string representation of the error will be returned.
1339 @type err: L{EnvironmentError}
1340 @param err: the exception to format
1343 if hasattr(err, 'errno'):
1344 detail = errno.errorcode[err.errno]
1350 def _OSOndiskVersion(name, os_dir):
1351 """Compute and return the API version of a given OS.
1353 This function will try to read the API version of the OS given by
1354 the 'name' parameter and residing in the 'os_dir' directory.
1357 @param name: the OS name we should look for
1359 @param os_dir: the directory inwhich we should look for the OS
1362 Either an integer denoting the version or None in the
1363 case when this is not a valid OS name.
1364 @raise errors.InvalidOS: if the OS cannot be found
1367 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1370 st = os.stat(api_file)
1371 except EnvironmentError, err:
1372 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1373 " found (%s)" % _ErrnoOrStr(err))
1375 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1376 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1382 api_versions = f.readlines()
1385 except EnvironmentError, err:
1386 raise errors.InvalidOS(name, os_dir, "error while reading the"
1387 " API version (%s)" % _ErrnoOrStr(err))
1389 api_versions = [version.strip() for version in api_versions]
1391 api_versions = [int(version) for version in api_versions]
1392 except (TypeError, ValueError), err:
1393 raise errors.InvalidOS(name, os_dir,
1394 "API version is not integer (%s)" % str(err))
1399 def DiagnoseOS(top_dirs=None):
1400 """Compute the validity for all OSes.
1402 @type top_dirs: list
1403 @param top_dirs: the list of directories in which to
1404 search (if not given defaults to
1405 L{constants.OS_SEARCH_PATH})
1406 @rtype: list of L{objects.OS}
1407 @return: an OS object for each name in all the given
1411 if top_dirs is None:
1412 top_dirs = constants.OS_SEARCH_PATH
1415 for dir_name in top_dirs:
1416 if os.path.isdir(dir_name):
1418 f_names = utils.ListVisibleFiles(dir_name)
1419 except EnvironmentError, err:
1420 logging.exception("Can't list the OS directory %s", dir_name)
1422 for name in f_names:
1424 os_inst = OSFromDisk(name, base_dir=dir_name)
1425 result.append(os_inst)
1426 except errors.InvalidOS, err:
1427 result.append(objects.OS.FromInvalidOS(err))
1432 def OSFromDisk(name, base_dir=None):
1433 """Create an OS instance from disk.
1435 This function will return an OS instance if the given name is a
1436 valid OS name. Otherwise, it will raise an appropriate
1437 L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1439 @type base_dir: string
1440 @keyword base_dir: Base directory containing OS installations.
1441 Defaults to a search in all the OS_SEARCH_PATH dirs.
1442 @rtype: L{objects.OS}
1443 @return: the OS instance if we find a valid one
1444 @raise errors.InvalidOS: if we don't find a valid OS
1447 if base_dir is None:
1448 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1450 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1452 os_dir = os.path.sep.join([base_dir, name])
1454 api_versions = _OSOndiskVersion(name, os_dir)
1456 if constants.OS_API_VERSION not in api_versions:
1457 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1458 " (found %s want %s)"
1459 % (api_versions, constants.OS_API_VERSION))
1461 # OS Scripts dictionary, we will populate it with the actual script names
1462 os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1464 for script in os_scripts:
1465 os_scripts[script] = os.path.sep.join([os_dir, script])
1468 st = os.stat(os_scripts[script])
1469 except EnvironmentError, err:
1470 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1471 (script, _ErrnoOrStr(err)))
1473 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1474 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1477 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1478 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1482 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1483 create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1484 export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1485 import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1486 rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1487 api_versions=api_versions)
1489 def OSEnvironment(instance, debug=0):
1490 """Calculate the environment for an os script.
1492 @type instance: L{objects.Instance}
1493 @param instance: target instance for the os script run
1494 @type debug: integer
1495 @param debug: debug level (0 or 1, for OS Api 10)
1497 @return: dict of environment variables
1498 @raise errors.BlockDeviceError: if the block device
1503 result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1504 result['INSTANCE_NAME'] = instance.name
1505 result['HYPERVISOR'] = instance.hypervisor
1506 result['DISK_COUNT'] = '%d' % len(instance.disks)
1507 result['NIC_COUNT'] = '%d' % len(instance.nics)
1508 result['DEBUG_LEVEL'] = '%d' % debug
1509 for idx, disk in enumerate(instance.disks):
1510 real_disk = _RecursiveFindBD(disk)
1511 if real_disk is None:
1512 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1515 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1516 # FIXME: When disks will have read-only mode, populate this
1517 result['DISK_%d_ACCESS' % idx] = 'W'
1518 if constants.HV_DISK_TYPE in instance.hvparams:
1519 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1520 instance.hvparams[constants.HV_DISK_TYPE]
1521 if disk.dev_type in constants.LDS_BLOCK:
1522 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1523 elif disk.dev_type == constants.LD_FILE:
1524 result['DISK_%d_BACKEND_TYPE' % idx] = \
1525 'file:%s' % disk.physical_id[0]
1526 for idx, nic in enumerate(instance.nics):
1527 result['NIC_%d_MAC' % idx] = nic.mac
1529 result['NIC_%d_IP' % idx] = nic.ip
1530 result['NIC_%d_BRIDGE' % idx] = nic.bridge
1531 if constants.HV_NIC_TYPE in instance.hvparams:
1532 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1533 instance.hvparams[constants.HV_NIC_TYPE]
1537 def GrowBlockDevice(disk, amount):
1538 """Grow a stack of block devices.
1540 This function is called recursively, with the childrens being the
1541 first ones to resize.
1543 @type disk: L{objects.Disk}
1544 @param disk: the disk to be grown
1545 @rtype: (status, result)
1546 @return: a tuple with the status of the operation
1547 (True/False), and the errors message if status
1551 r_dev = _RecursiveFindBD(disk)
1553 return False, "Cannot find block device %s" % (disk,)
1557 except errors.BlockDeviceError, err:
1558 return False, str(err)
1563 def SnapshotBlockDevice(disk):
1564 """Create a snapshot copy of a block device.
1566 This function is called recursively, and the snapshot is actually created
1567 just for the leaf lvm backend device.
1569 @type disk: L{objects.Disk}
1570 @param disk: the disk to be snapshotted
1572 @return: snapshot disk path
1576 if len(disk.children) == 1:
1577 # only one child, let's recurse on it
1578 return SnapshotBlockDevice(disk.children[0])
1580 # more than one child, choose one that matches
1581 for child in disk.children:
1582 if child.size == disk.size:
1583 # return implies breaking the loop
1584 return SnapshotBlockDevice(child)
1585 elif disk.dev_type == constants.LD_LV:
1586 r_dev = _RecursiveFindBD(disk)
1587 if r_dev is not None:
1588 # let's stay on the safe side and ask for the full size, for now
1589 return r_dev.Snapshot(disk.size)
1593 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1594 " '%s' of type '%s'" %
1595 (disk.unique_id, disk.dev_type))
1598 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1599 """Export a block device snapshot to a remote node.
1601 @type disk: L{objects.Disk}
1602 @param disk: the description of the disk to export
1603 @type dest_node: str
1604 @param dest_node: the destination node to export to
1605 @type instance: L{objects.Instance}
1606 @param instance: the instance object to whom the disk belongs
1607 @type cluster_name: str
1608 @param cluster_name: the cluster name, needed for SSH hostalias
1610 @param idx: the index of the disk in the instance's disk list,
1611 used to export to the OS scripts environment
1613 @return: the success of the operation
1616 export_env = OSEnvironment(instance)
1618 inst_os = OSFromDisk(instance.os)
1619 export_script = inst_os.export_script
1621 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1622 instance.name, int(time.time()))
1623 if not os.path.exists(constants.LOG_OS_DIR):
1624 os.mkdir(constants.LOG_OS_DIR, 0750)
1625 real_disk = _RecursiveFindBD(disk)
1626 if real_disk is None:
1627 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1631 export_env['EXPORT_DEVICE'] = real_disk.dev_path
1632 export_env['EXPORT_INDEX'] = str(idx)
1634 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1635 destfile = disk.physical_id[1]
1637 # the target command is built out of three individual commands,
1638 # which are joined by pipes; we check each individual command for
1640 expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1641 export_script, logfile)
1645 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1646 destdir, destdir, destfile)
1647 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1648 constants.GANETI_RUNAS,
1651 # all commands have been checked, so we're safe to combine them
1652 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1654 result = utils.RunCmd(command, env=export_env)
1657 logging.error("os snapshot export command '%s' returned error: %s"
1658 " output: %s", command, result.fail_reason, result.output)
1664 def FinalizeExport(instance, snap_disks):
1665 """Write out the export configuration information.
1667 @type instance: L{objects.Instance}
1668 @param instance: the instance which we export, used for
1669 saving configuration
1670 @type snap_disks: list of L{objects.Disk}
1671 @param snap_disks: list of snapshot block devices, which
1672 will be used to get the actual name of the dump file
1675 @return: the success of the operation
1678 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1679 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1681 config = objects.SerializableConfigParser()
1683 config.add_section(constants.INISECT_EXP)
1684 config.set(constants.INISECT_EXP, 'version', '0')
1685 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1686 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1687 config.set(constants.INISECT_EXP, 'os', instance.os)
1688 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1690 config.add_section(constants.INISECT_INS)
1691 config.set(constants.INISECT_INS, 'name', instance.name)
1692 config.set(constants.INISECT_INS, 'memory', '%d' %
1693 instance.beparams[constants.BE_MEMORY])
1694 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1695 instance.beparams[constants.BE_VCPUS])
1696 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1699 for nic_count, nic in enumerate(instance.nics):
1701 config.set(constants.INISECT_INS, 'nic%d_mac' %
1702 nic_count, '%s' % nic.mac)
1703 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1704 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1706 # TODO: redundant: on load can read nics until it doesn't exist
1707 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1710 for disk_count, disk in enumerate(snap_disks):
1713 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1714 ('%s' % disk.iv_name))
1715 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1716 ('%s' % disk.physical_id[1]))
1717 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1720 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1722 utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1723 data=config.Dumps())
1724 shutil.rmtree(finaldestdir, True)
1725 shutil.move(destdir, finaldestdir)
1730 def ExportInfo(dest):
1731 """Get export configuration information.
1734 @param dest: directory containing the export
1736 @rtype: L{objects.SerializableConfigParser}
1737 @return: a serializable config file containing the
1741 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1743 config = objects.SerializableConfigParser()
1746 if (not config.has_section(constants.INISECT_EXP) or
1747 not config.has_section(constants.INISECT_INS)):
1753 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1754 """Import an os image into an instance.
1756 @type instance: L{objects.Instance}
1757 @param instance: instance to import the disks into
1758 @type src_node: string
1759 @param src_node: source node for the disk images
1760 @type src_images: list of string
1761 @param src_images: absolute paths of the disk images
1762 @rtype: list of boolean
1763 @return: each boolean represent the success of importing the n-th disk
1766 import_env = OSEnvironment(instance)
1767 inst_os = OSFromDisk(instance.os)
1768 import_script = inst_os.import_script
1770 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1771 instance.name, int(time.time()))
1772 if not os.path.exists(constants.LOG_OS_DIR):
1773 os.mkdir(constants.LOG_OS_DIR, 0750)
1776 impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1777 import_script, logfile)
1780 for idx, image in enumerate(src_images):
1782 destcmd = utils.BuildShellCmd('cat %s', image)
1783 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1784 constants.GANETI_RUNAS,
1786 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1787 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1788 import_env['IMPORT_INDEX'] = str(idx)
1789 result = utils.RunCmd(command, env=import_env)
1791 logging.error("Disk import command '%s' returned error: %s"
1792 " output: %s", command, result.fail_reason,
1794 final_result.append(False)
1796 final_result.append(True)
1798 final_result.append(True)
1804 """Return a list of exports currently available on this machine.
1807 @return: list of the exports
1810 if os.path.isdir(constants.EXPORT_DIR):
1811 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1816 def RemoveExport(export):
1817 """Remove an existing export from the node.
1820 @param export: the name of the export to remove
1822 @return: the success of the operation
1825 target = os.path.join(constants.EXPORT_DIR, export)
1827 shutil.rmtree(target)
1828 # TODO: catch some of the relevant exceptions and provide a pretty
1829 # error message if rmtree fails.
1834 def RenameBlockDevices(devlist):
1835 """Rename a list of block devices.
1837 @type devlist: list of tuples
1838 @param devlist: list of tuples of the form (disk,
1839 new_logical_id, new_physical_id); disk is an
1840 L{objects.Disk} object describing the current disk,
1841 and new logical_id/physical_id is the name we
1844 @return: True if all renames succeeded, False otherwise
1848 for disk, unique_id in devlist:
1849 dev = _RecursiveFindBD(disk)
1854 old_rpath = dev.dev_path
1855 dev.Rename(unique_id)
1856 new_rpath = dev.dev_path
1857 if old_rpath != new_rpath:
1858 DevCacheManager.RemoveCache(old_rpath)
1859 # FIXME: we should add the new cache information here, like:
1860 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1861 # but we don't have the owner here - maybe parse from existing
1862 # cache? for now, we only lose lvm data when we rename, which
1863 # is less critical than DRBD or MD
1864 except errors.BlockDeviceError, err:
1865 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1870 def _TransformFileStorageDir(file_storage_dir):
1871 """Checks whether given file_storage_dir is valid.
1873 Checks wheter the given file_storage_dir is within the cluster-wide
1874 default file_storage_dir stored in SimpleStore. Only paths under that
1875 directory are allowed.
1877 @type file_storage_dir: str
1878 @param file_storage_dir: the path to check
1880 @return: the normalized path if valid, None otherwise
1884 file_storage_dir = os.path.normpath(file_storage_dir)
1885 base_file_storage_dir = cfg.GetFileStorageDir()
1886 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1887 base_file_storage_dir):
1888 logging.error("file storage directory '%s' is not under base file"
1889 " storage directory '%s'",
1890 file_storage_dir, base_file_storage_dir)
1892 return file_storage_dir
1895 def CreateFileStorageDir(file_storage_dir):
1896 """Create file storage directory.
1898 @type file_storage_dir: str
1899 @param file_storage_dir: directory to create
1902 @return: tuple with first element a boolean indicating wheter dir
1903 creation was successful or not
1906 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1908 if not file_storage_dir:
1911 if os.path.exists(file_storage_dir):
1912 if not os.path.isdir(file_storage_dir):
1913 logging.error("'%s' is not a directory", file_storage_dir)
1917 os.makedirs(file_storage_dir, 0750)
1918 except OSError, err:
1919 logging.error("Cannot create file storage directory '%s': %s",
1920 file_storage_dir, err)
1925 def RemoveFileStorageDir(file_storage_dir):
1926 """Remove file storage directory.
1928 Remove it only if it's empty. If not log an error and return.
1930 @type file_storage_dir: str
1931 @param file_storage_dir: the directory we should cleanup
1932 @rtype: tuple (success,)
1933 @return: tuple of one element, C{success}, denoting
1934 whether the operation was successfull
1937 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1939 if not file_storage_dir:
1942 if os.path.exists(file_storage_dir):
1943 if not os.path.isdir(file_storage_dir):
1944 logging.error("'%s' is not a directory", file_storage_dir)
1946 # deletes dir only if empty, otherwise we want to return False
1948 os.rmdir(file_storage_dir)
1949 except OSError, err:
1950 logging.exception("Cannot remove file storage directory '%s'",
1956 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1957 """Rename the file storage directory.
1959 @type old_file_storage_dir: str
1960 @param old_file_storage_dir: the current path
1961 @type new_file_storage_dir: str
1962 @param new_file_storage_dir: the name we should rename to
1963 @rtype: tuple (success,)
1964 @return: tuple of one element, C{success}, denoting
1965 whether the operation was successful
1968 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1969 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1971 if not old_file_storage_dir or not new_file_storage_dir:
1974 if not os.path.exists(new_file_storage_dir):
1975 if os.path.isdir(old_file_storage_dir):
1977 os.rename(old_file_storage_dir, new_file_storage_dir)
1978 except OSError, err:
1979 logging.exception("Cannot rename '%s' to '%s'",
1980 old_file_storage_dir, new_file_storage_dir)
1983 logging.error("'%s' is not a directory", old_file_storage_dir)
1986 if os.path.exists(old_file_storage_dir):
1987 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1988 old_file_storage_dir, new_file_storage_dir)
1993 def _IsJobQueueFile(file_name):
1994 """Checks whether the given filename is in the queue directory.
1996 @type file_name: str
1997 @param file_name: the file name we should check
1999 @return: whether the file is under the queue directory
2002 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2003 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2006 logging.error("'%s' is not a file in the queue directory",
2012 def JobQueueUpdate(file_name, content):
2013 """Updates a file in the queue directory.
2015 This is just a wrapper over L{utils.WriteFile}, with proper
2018 @type file_name: str
2019 @param file_name: the job file name
2021 @param content: the new job contents
2023 @return: the success of the operation
2026 if not _IsJobQueueFile(file_name):
2029 # Write and replace the file atomically
2030 utils.WriteFile(file_name, data=_Decompress(content))
2035 def JobQueueRename(old, new):
2036 """Renames a job queue file.
2038 This is just a wrapper over os.rename with proper checking.
2041 @param old: the old (actual) file name
2043 @param new: the desired file name
2045 @return: the success of the operation
2048 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2051 utils.RenameFile(old, new, mkdir=True)
2056 def JobQueueSetDrainFlag(drain_flag):
2057 """Set the drain flag for the queue.
2059 This will set or unset the queue drain flag.
2061 @type drain_flag: boolean
2062 @param drain_flag: if True, will set the drain flag, otherwise reset it.
2064 @return: always True
2065 @warning: the function always returns True
2069 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2071 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2076 def CloseBlockDevices(disks):
2077 """Closes the given block devices.
2079 This means they will be switched to secondary mode (in case of
2082 @type disks: list of L{objects.Disk}
2083 @param disks: the list of disks to be closed
2084 @rtype: tuple (success, message)
2085 @return: a tuple of success and message, where success
2086 indicates the succes of the operation, and message
2087 which will contain the error details in case we
2093 rd = _RecursiveFindBD(cf)
2095 return (False, "Can't find device %s" % cf)
2102 except errors.BlockDeviceError, err:
2103 msg.append(str(err))
2105 return (False, "Can't make devices secondary: %s" % ",".join(msg))
2107 return (True, "All devices secondary")
2110 def ValidateHVParams(hvname, hvparams):
2111 """Validates the given hypervisor parameters.
2113 @type hvname: string
2114 @param hvname: the hypervisor name
2115 @type hvparams: dict
2116 @param hvparams: the hypervisor parameters to be validated
2117 @rtype: tuple (success, message)
2118 @return: a tuple of success and message, where success
2119 indicates the succes of the operation, and message
2120 which will contain the error details in case we
2125 hv_type = hypervisor.GetHypervisor(hvname)
2126 hv_type.ValidateParameters(hvparams)
2127 return (True, "Validation passed")
2128 except errors.HypervisorError, err:
2129 return (False, str(err))
2133 """Demotes the current node from master candidate role.
2136 # try to ensure we're not the master by mistake
2137 master, myself = ssconf.GetMasterAndMyself()
2138 if master == myself:
2139 return (False, "ssconf status shows I'm the master node, will not demote")
2140 pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2141 if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2142 return (False, "The master daemon is running, will not demote")
2144 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2145 except EnvironmentError, err:
2146 if err.errno != errno.ENOENT:
2147 return (False, "Error while backing up cluster file: %s" % str(err))
2148 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2149 return (True, "Done")
2152 class HooksRunner(object):
2155 This class is instantiated on the node side (ganeti-noded) and not
2159 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2161 def __init__(self, hooks_base_dir=None):
2162 """Constructor for hooks runner.
2164 @type hooks_base_dir: str or None
2165 @param hooks_base_dir: if not None, this overrides the
2166 L{constants.HOOKS_BASE_DIR} (useful for unittests)
2169 if hooks_base_dir is None:
2170 hooks_base_dir = constants.HOOKS_BASE_DIR
2171 self._BASE_DIR = hooks_base_dir
2174 def ExecHook(script, env):
2175 """Exec one hook script.
2178 @param script: the full path to the script
2180 @param env: the environment with which to exec the script
2181 @rtype: tuple (success, message)
2182 @return: a tuple of success and message, where success
2183 indicates the succes of the operation, and message
2184 which will contain the error details in case we
2188 # exec the process using subprocess and log the output
2191 fdstdin = open("/dev/null", "r")
2192 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2193 stderr=subprocess.STDOUT, close_fds=True,
2194 shell=False, cwd="/", env=env)
2197 output = child.stdout.read(4096)
2198 child.stdout.close()
2199 except EnvironmentError, err:
2200 output += "Hook script error: %s" % str(err)
2204 result = child.wait()
2206 except EnvironmentError, err:
2207 if err.errno == errno.EINTR:
2211 # try not to leak fds
2212 for fd in (fdstdin, ):
2216 except EnvironmentError, err:
2217 # just log the error
2218 #logging.exception("Error while closing fd %s", fd)
2221 return result == 0, output
2223 def RunHooks(self, hpath, phase, env):
2224 """Run the scripts in the hooks directory.
2227 @param hpath: the path to the hooks directory which
2230 @param phase: either L{constants.HOOKS_PHASE_PRE} or
2231 L{constants.HOOKS_PHASE_POST}
2233 @param env: dictionary with the environment for the hook
2235 @return: list of 3-element tuples:
2237 - script result, either L{constants.HKR_SUCCESS} or
2238 L{constants.HKR_FAIL}
2239 - output of the script
2241 @raise errors.ProgrammerError: for invalid input
2245 if phase == constants.HOOKS_PHASE_PRE:
2247 elif phase == constants.HOOKS_PHASE_POST:
2250 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2253 subdir = "%s-%s.d" % (hpath, suffix)
2254 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2256 dir_contents = utils.ListVisibleFiles(dir_name)
2257 except OSError, err:
2258 # FIXME: must log output in case of failures
2261 # we use the standard python sort order,
2262 # so 00name is the recommended naming scheme
2264 for relname in dir_contents:
2265 fname = os.path.join(dir_name, relname)
2266 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2267 self.RE_MASK.match(relname) is not None):
2268 rrval = constants.HKR_SKIP
2271 result, output = self.ExecHook(fname, env)
2273 rrval = constants.HKR_FAIL
2275 rrval = constants.HKR_SUCCESS
2276 rr.append(("%s/%s" % (subdir, relname), rrval, output))
2281 class IAllocatorRunner(object):
2282 """IAllocator runner.
2284 This class is instantiated on the node side (ganeti-noded) and not on
2288 def Run(self, name, idata):
2289 """Run an iallocator script.
2292 @param name: the iallocator script name
2294 @param idata: the allocator input data
2297 @return: four element tuple of:
2298 - run status (one of the IARUN_ constants)
2301 - fail reason (as from L{utils.RunResult})
2304 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2306 if alloc_script is None:
2307 return (constants.IARUN_NOTFOUND, None, None, None)
2309 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2313 result = utils.RunCmd([alloc_script, fin_name])
2315 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2320 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2323 class DevCacheManager(object):
2324 """Simple class for managing a cache of block device information.
2327 _DEV_PREFIX = "/dev/"
2328 _ROOT_DIR = constants.BDEV_CACHE_DIR
2331 def _ConvertPath(cls, dev_path):
2332 """Converts a /dev/name path to the cache file name.
2334 This replaces slashes with underscores and strips the /dev
2335 prefix. It then returns the full path to the cache file.
2338 @param dev_path: the C{/dev/} path name
2340 @return: the converted path name
2343 if dev_path.startswith(cls._DEV_PREFIX):
2344 dev_path = dev_path[len(cls._DEV_PREFIX):]
2345 dev_path = dev_path.replace("/", "_")
2346 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2350 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2351 """Updates the cache information for a given device.
2354 @param dev_path: the pathname of the device
2356 @param owner: the owner (instance name) of the device
2357 @type on_primary: bool
2358 @param on_primary: whether this is the primary
2361 @param iv_name: the instance-visible name of the
2362 device, as in objects.Disk.iv_name
2367 if dev_path is None:
2368 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2370 fpath = cls._ConvertPath(dev_path)
2376 iv_name = "not_visible"
2377 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2379 utils.WriteFile(fpath, data=fdata)
2380 except EnvironmentError, err:
2381 logging.exception("Can't update bdev cache for %s", dev_path)
2384 def RemoveCache(cls, dev_path):
2385 """Remove data for a dev_path.
2387 This is just a wrapper over L{utils.RemoveFile} with a converted
2388 path name and logging.
2391 @param dev_path: the pathname of the device
2396 if dev_path is None:
2397 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2399 fpath = cls._ConvertPath(dev_path)
2401 utils.RemoveFile(fpath)
2402 except EnvironmentError, err:
2403 logging.exception("Can't update bdev cache for %s", dev_path)