4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Functions used by the node daemon"""
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
50 """Simple wrapper to return a SimpleStore.
52 @rtype: L{ssconf.SimpleStore}
53 @return: a SimpleStore instance
56 return ssconf.SimpleStore()
59 def _GetSshRunner(cluster_name):
60 """Simple wrapper to return an SshRunner.
62 @type cluster_name: str
63 @param cluster_name: the cluster name, which is needed
64 by the SshRunner constructor
65 @rtype: L{ssh.SshRunner}
66 @return: an SshRunner instance
69 return ssh.SshRunner(cluster_name)
72 def _Decompress(data):
73 """Unpacks data compressed by the RPC client.
75 @type data: list or tuple
76 @param data: Data sent by RPC client
78 @return: Decompressed data
81 assert isinstance(data, (list, tuple))
83 (encoding, content) = data
84 if encoding == constants.RPC_ENCODING_NONE:
86 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87 return zlib.decompress(base64.b64decode(content))
89 raise AssertionError("Unknown data encoding")
92 def _CleanDirectory(path, exclude=None):
93 """Removes all regular files in a directory.
96 @param path: the directory to clean
98 @param exclude: list of files to be excluded, defaults
102 if not os.path.isdir(path):
107 # Normalize excluded paths
108 exclude = [os.path.normpath(i) for i in exclude]
110 for rel_name in utils.ListVisibleFiles(path):
111 full_name = os.path.normpath(os.path.join(path, rel_name))
112 if full_name in exclude:
114 if os.path.isfile(full_name) and not os.path.islink(full_name):
115 utils.RemoveFile(full_name)
119 """Removes job queue files and archived jobs.
124 _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125 _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
129 """Returns master information.
131 This is an utility function to compute master information, either
132 for consumption here or from the node daemon.
135 @return: (master_netdev, master_ip, master_name) if we have a good
136 configuration, otherwise (None, None, None)
141 master_netdev = cfg.GetMasterNetdev()
142 master_ip = cfg.GetMasterIP()
143 master_node = cfg.GetMasterNode()
144 except errors.ConfigurationError, err:
145 logging.exception("Cluster configuration incomplete")
146 return (None, None, None)
147 return (master_netdev, master_ip, master_node)
150 def StartMaster(start_daemons):
151 """Activate local node as master node.
153 The function will always try activate the IP address of the master
154 (unless someone else has it). It will also start the master daemons,
155 based on the start_daemons parameter.
157 @type start_daemons: boolean
158 @param start_daemons: whther to also start the master
159 daemons (ganeti-masterd and ganeti-rapi)
164 master_netdev, master_ip, _ = GetMasterInfo()
165 if not master_netdev:
168 if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169 if utils.OwnIpAddress(master_ip):
170 # we already have the ip:
171 logging.debug("Already started")
173 logging.error("Someone else has the master ip, not activating")
176 result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177 "dev", master_netdev, "label",
178 "%s:0" % master_netdev])
180 logging.error("Can't activate master IP: %s", result.output)
183 result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184 "-s", master_ip, master_ip])
185 # we'll ignore the exit code of arping
187 # and now start the master and rapi daemons
189 for daemon in 'ganeti-masterd', 'ganeti-rapi':
190 result = utils.RunCmd([daemon])
192 logging.error("Can't start daemon %s: %s", daemon, result.output)
197 def StopMaster(stop_daemons):
198 """Deactivate this node as master.
200 The function will always try to deactivate the IP address of the
201 master. It will also stop the master daemons depending on the
202 stop_daemons parameter.
204 @type stop_daemons: boolean
205 @param stop_daemons: whether to also stop the master daemons
206 (ganeti-masterd and ganeti-rapi)
210 master_netdev, master_ip, _ = GetMasterInfo()
211 if not master_netdev:
214 result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215 "dev", master_netdev])
217 logging.error("Can't remove the master IP, error: %s", result.output)
218 # but otherwise ignore the failure
221 # stop/kill the rapi and the master daemon
222 for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223 utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229 """Joins this node to the cluster.
231 This does the following:
232 - updates the hostkeys of the machine (rsa and dsa)
233 - adds the ssh private key to the user
234 - adds the ssh public key to the users' authorized_keys file
237 @param dsa: the DSA private key to write
239 @param dsapub: the DSA public key to write
241 @param rsa: the RSA private key to write
243 @param rsapub: the RSA public key to write
245 @param sshkey: the SSH private key to write
247 @param sshpub: the SSH public key to write
249 @return: the success of the operation
252 sshd_keys = [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256 for name, content, mode in sshd_keys:
257 utils.WriteFile(name, data=content, mode=mode)
260 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
262 except errors.OpExecError, err:
263 msg = "Error while processing user ssh files"
264 logging.exception(msg)
265 return (False, "%s: %s" % (msg, err))
267 for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268 utils.WriteFile(name, data=content, mode=0600)
270 utils.AddAuthorizedKey(auth_keys, sshpub)
272 utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
274 return (True, "Node added successfully")
278 """Cleans up and remove the current node.
280 This function cleans up and prepares the current node to be removed
283 If processing is successful, then it raises an
284 L{errors.QuitGanetiException} which is used as a special case to
285 shutdown the node daemon.
288 _CleanDirectory(constants.DATA_DIR)
292 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293 except errors.OpExecError:
294 logging.exception("Error while processing ssh files")
297 f = open(pub_key, 'r')
299 utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
303 utils.RemoveFile(priv_key)
304 utils.RemoveFile(pub_key)
306 # Return a reassuring string to the caller, and quit
307 raise errors.QuitGanetiException(False, 'Shutdown scheduled')
310 def GetNodeInfo(vgname, hypervisor_type):
311 """Gives back a hash with different informations about the node.
313 @type vgname: C{string}
314 @param vgname: the name of the volume group to ask for disk space information
315 @type hypervisor_type: C{str}
316 @param hypervisor_type: the name of the hypervisor to ask for
319 @return: dictionary with the following keys:
320 - vg_size is the size of the configured volume group in MiB
321 - vg_free is the free size of the volume group in MiB
322 - memory_dom0 is the memory allocated for domain0 in MiB
323 - memory_free is the currently available (free) ram in MiB
324 - memory_total is the total number of ram in MiB
328 vginfo = _GetVGInfo(vgname)
329 outputarray['vg_size'] = vginfo['vg_size']
330 outputarray['vg_free'] = vginfo['vg_free']
332 hyper = hypervisor.GetHypervisor(hypervisor_type)
333 hyp_info = hyper.GetNodeInfo()
334 if hyp_info is not None:
335 outputarray.update(hyp_info)
337 f = open("/proc/sys/kernel/random/boot_id", 'r')
339 outputarray["bootid"] = f.read(128).rstrip("\n")
346 def VerifyNode(what, cluster_name):
347 """Verify the status of the local node.
349 Based on the input L{what} parameter, various checks are done on the
352 If the I{filelist} key is present, this list of
353 files is checksummed and the file/checksum pairs are returned.
355 If the I{nodelist} key is present, we check that we have
356 connectivity via ssh with the target nodes (and check the hostname
359 If the I{node-net-test} key is present, we check that we have
360 connectivity to the given nodes via both primary IP and, if
361 applicable, secondary IPs.
364 @param what: a dictionary of things to check:
365 - filelist: list of files for which to compute checksums
366 - nodelist: list of nodes we should check ssh communication with
367 - node-net-test: list of nodes we should check node daemon port
369 - hypervisor: list with hypervisors to run the verify for
371 @return: a dictionary with the same keys as the input dict, and
372 values representing the result of the checks
377 if constants.NV_HYPERVISOR in what:
378 result[constants.NV_HYPERVISOR] = tmp = {}
379 for hv_name in what[constants.NV_HYPERVISOR]:
380 tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
382 if constants.NV_FILELIST in what:
383 result[constants.NV_FILELIST] = utils.FingerprintFiles(
384 what[constants.NV_FILELIST])
386 if constants.NV_NODELIST in what:
387 result[constants.NV_NODELIST] = tmp = {}
388 random.shuffle(what[constants.NV_NODELIST])
389 for node in what[constants.NV_NODELIST]:
390 success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
394 if constants.NV_NODENETTEST in what:
395 result[constants.NV_NODENETTEST] = tmp = {}
396 my_name = utils.HostInfo().name
397 my_pip = my_sip = None
398 for name, pip, sip in what[constants.NV_NODENETTEST]:
404 tmp[my_name] = ("Can't find my own primary/secondary IP"
407 port = utils.GetNodeDaemonPort()
408 for name, pip, sip in what[constants.NV_NODENETTEST]:
410 if not utils.TcpPing(pip, port, source=my_pip):
411 fail.append("primary")
413 if not utils.TcpPing(sip, port, source=my_sip):
414 fail.append("secondary")
416 tmp[name] = ("failure using the %s interface(s)" %
419 if constants.NV_LVLIST in what:
420 result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
422 if constants.NV_INSTANCELIST in what:
423 result[constants.NV_INSTANCELIST] = GetInstanceList(
424 what[constants.NV_INSTANCELIST])
426 if constants.NV_VGLIST in what:
427 result[constants.NV_VGLIST] = ListVolumeGroups()
429 if constants.NV_VERSION in what:
430 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431 constants.RELEASE_VERSION)
433 if constants.NV_HVINFO in what:
434 hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435 result[constants.NV_HVINFO] = hyper.GetNodeInfo()
437 if constants.NV_DRBDLIST in what:
439 used_minors = bdev.DRBD8.GetUsedDevs().keys()
440 except errors.BlockDeviceError:
441 logging.warning("Can't get used minors list", exc_info=True)
443 result[constants.NV_DRBDLIST] = used_minors
448 def GetVolumeList(vg_name):
449 """Compute list of logical volumes and their size.
452 @param vg_name: the volume group whose LVs we should list
455 dictionary of all partions (key) with value being a tuple of
456 their size (in MiB), inactive and online status::
458 {'test1': ('20.06', True, True)}
460 in case of errors, a string is returned with the error
466 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467 "--separator=%s" % sep,
468 "-olv_name,lv_size,lv_attr", vg_name])
470 logging.error("Failed to list logical volumes, lvs output: %s",
474 valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475 for line in result.stdout.splitlines():
477 match = valid_line_re.match(line)
479 logging.error("Invalid line returned from lvs output: '%s'", line)
481 name, size, attr = match.groups()
482 inactive = attr[4] == '-'
483 online = attr[5] == 'o'
484 lvs[name] = (size, inactive, online)
489 def ListVolumeGroups():
490 """List the volume groups and their size.
493 @return: dictionary with keys volume name and values the
497 return utils.ListVolumeGroups()
501 """List all volumes on this node.
505 A list of dictionaries, each having four keys:
506 - name: the logical volume name,
507 - size: the size of the logical volume
508 - dev: the physical device on which the LV lives
509 - vg: the volume group to which it belongs
511 In case of errors, we return an empty list and log the
514 Note that since a logical volume can live on multiple physical
515 volumes, the resulting list might include a logical volume
519 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
521 "--options=lv_name,lv_size,devices,vg_name"])
523 logging.error("Failed to list logical volumes, lvs output: %s",
529 return dev.split('(')[0]
535 'name': line[0].strip(),
536 'size': line[1].strip(),
537 'dev': parse_dev(line[2].strip()),
538 'vg': line[3].strip(),
541 return [map_line(line.split('|')) for line in result.stdout.splitlines()
542 if line.count('|') >= 3]
545 def BridgesExist(bridges_list):
546 """Check if a list of bridges exist on the current node.
549 @return: C{True} if all of them exist, C{False} otherwise
552 for bridge in bridges_list:
553 if not utils.BridgeExists(bridge):
559 def GetInstanceList(hypervisor_list):
560 """Provides a list of instances.
562 @type hypervisor_list: list
563 @param hypervisor_list: the list of hypervisors to query information
566 @return: a list of all running instances on the current node
567 - instance1.example.com
568 - instance2.example.com
572 for hname in hypervisor_list:
574 names = hypervisor.GetHypervisor(hname).ListInstances()
575 results.extend(names)
576 except errors.HypervisorError, err:
577 logging.exception("Error enumerating instances for hypevisor %s", hname)
578 # FIXME: should we somehow not propagate this to the master?
584 def GetInstanceInfo(instance, hname):
585 """Gives back the informations about an instance as a dictionary.
587 @type instance: string
588 @param instance: the instance name
590 @param hname: the hypervisor type of the instance
593 @return: dictionary with the following keys:
594 - memory: memory size of instance (int)
595 - state: xen state of instance (string)
596 - time: cpu time of instance (float)
601 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
602 if iinfo is not None:
603 output['memory'] = iinfo[2]
604 output['state'] = iinfo[4]
605 output['time'] = iinfo[5]
610 def GetInstanceMigratable(instance):
611 """Gives whether an instance can be migrated.
613 @type instance: L{objects.Instance}
614 @param instance: object representing the instance to be checked.
617 @return: tuple of (result, description) where:
618 - result: whether the instance can be migrated or not
619 - description: a description of the issue, if relevant
622 hyper = hypervisor.GetHypervisor(instance.hypervisor)
623 if instance.name not in hyper.ListInstances():
624 return (False, 'not running')
626 for idx in range(len(instance.disks)):
627 link_name = _GetBlockDevSymlinkPath(instance.name, idx)
628 if not os.path.islink(link_name):
629 return (False, 'not restarted since ganeti 1.2.5')
634 def GetAllInstancesInfo(hypervisor_list):
635 """Gather data about all instances.
637 This is the equivalent of L{GetInstanceInfo}, except that it
638 computes data for all instances at once, thus being faster if one
639 needs data about more than one instance.
641 @type hypervisor_list: list
642 @param hypervisor_list: list of hypervisors to query for instance data
645 @return: dictionary of instance: data, with data having the following keys:
646 - memory: memory size of instance (int)
647 - state: xen state of instance (string)
648 - time: cpu time of instance (float)
649 - vcpus: the number of vcpus
654 for hname in hypervisor_list:
655 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
657 for name, inst_id, memory, vcpus, state, times in iinfo:
664 if name in output and output[name] != value:
665 raise errors.HypervisorError("Instance %s running duplicate"
666 " with different parameters" % name)
672 def AddOSToInstance(instance):
673 """Add an OS to an instance.
675 @type instance: L{objects.Instance}
676 @param instance: Instance whose OS is to be installed
678 @return: the success of the operation
681 inst_os = OSFromDisk(instance.os)
683 create_env = OSEnvironment(instance)
685 logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
686 instance.name, int(time.time()))
688 result = utils.RunCmd([inst_os.create_script], env=create_env,
689 cwd=inst_os.path, output=logfile,)
691 logging.error("os create command '%s' returned error: %s, logfile: %s,"
692 " output: %s", result.cmd, result.fail_reason, logfile,
694 lines = [val.encode("string_escape")
695 for val in utils.TailFile(logfile, lines=20)]
696 return (False, "OS create script failed (%s), last lines in the"
697 " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
699 return (True, "Successfully installed")
702 def RunRenameInstance(instance, old_name):
703 """Run the OS rename script for an instance.
705 @type instance: L{objects.Instance}
706 @param instance: Instance whose OS is to be installed
707 @type old_name: string
708 @param old_name: previous instance name
710 @return: the success of the operation
713 inst_os = OSFromDisk(instance.os)
715 rename_env = OSEnvironment(instance)
716 rename_env['OLD_INSTANCE_NAME'] = old_name
718 logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
720 instance.name, int(time.time()))
722 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
723 cwd=inst_os.path, output=logfile)
726 logging.error("os create command '%s' returned error: %s output: %s",
727 result.cmd, result.fail_reason, result.output)
728 lines = [val.encode("string_escape")
729 for val in utils.TailFile(logfile, lines=20)]
730 return (False, "OS rename script failed (%s), last lines in the"
731 " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
733 return (True, "Rename successful")
736 def _GetVGInfo(vg_name):
737 """Get informations about the volume group.
740 @param vg_name: the volume group which we query
743 A dictionary with the following keys:
744 - C{vg_size} is the total size of the volume group in MiB
745 - C{vg_free} is the free size of the volume group in MiB
746 - C{pv_count} are the number of physical disks in that VG
748 If an error occurs during gathering of data, we return the same dict
749 with keys all set to None.
752 retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
754 retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
755 "--nosuffix", "--units=m", "--separator=:", vg_name])
758 logging.error("volume group %s not present", vg_name)
760 valarr = retval.stdout.strip().rstrip(':').split(':')
764 "vg_size": int(round(float(valarr[0]), 0)),
765 "vg_free": int(round(float(valarr[1]), 0)),
766 "pv_count": int(valarr[2]),
768 except ValueError, err:
769 logging.exception("Fail to parse vgs output")
771 logging.error("vgs output has the wrong number of fields (expected"
772 " three): %s", str(valarr))
776 def _GetBlockDevSymlinkPath(instance_name, idx):
777 return os.path.join(constants.DISK_LINKS_DIR,
778 "%s:%d" % (instance_name, idx))
781 def _SymlinkBlockDev(instance_name, device_path, idx):
782 """Set up symlinks to a instance's block device.
784 This is an auxiliary function run when an instance is start (on the primary
785 node) or when an instance is migrated (on the target node).
788 @param instance_name: the name of the target instance
789 @param device_path: path of the physical block device, on the node
790 @param idx: the disk index
791 @return: absolute path to the disk's symlink
794 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
796 os.symlink(device_path, link_name)
798 if err.errno == errno.EEXIST:
799 if (not os.path.islink(link_name) or
800 os.readlink(link_name) != device_path):
802 os.symlink(device_path, link_name)
809 def _RemoveBlockDevLinks(instance_name, disks):
810 """Remove the block device symlinks belonging to the given instance.
813 for idx, disk in enumerate(disks):
814 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
815 if os.path.islink(link_name):
819 logging.exception("Can't remove symlink '%s'", link_name)
822 def _GatherAndLinkBlockDevs(instance):
823 """Set up an instance's block device(s).
825 This is run on the primary node at instance startup. The block
826 devices must be already assembled.
828 @type instance: L{objects.Instance}
829 @param instance: the instance whose disks we shoul assemble
831 @return: list of (disk_object, device_path)
835 for idx, disk in enumerate(instance.disks):
836 device = _RecursiveFindBD(disk)
838 raise errors.BlockDeviceError("Block device '%s' is not set up." %
842 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
844 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
847 block_devices.append((disk, link_name))
852 def StartInstance(instance, extra_args):
853 """Start an instance.
855 @type instance: L{objects.Instance}
856 @param instance: the instance object
858 @return: whether the startup was successful or not
861 running_instances = GetInstanceList([instance.hypervisor])
863 if instance.name in running_instances:
864 return (True, "Already running")
867 block_devices = _GatherAndLinkBlockDevs(instance)
868 hyper = hypervisor.GetHypervisor(instance.hypervisor)
869 hyper.StartInstance(instance, block_devices, extra_args)
870 except errors.BlockDeviceError, err:
871 logging.exception("Failed to start instance")
872 return (False, "Block device error: %s" % str(err))
873 except errors.HypervisorError, err:
874 logging.exception("Failed to start instance")
875 _RemoveBlockDevLinks(instance.name, instance.disks)
876 return (False, "Hypervisor error: %s" % str(err))
878 return (True, "Instance started successfully")
881 def ShutdownInstance(instance):
882 """Shut an instance down.
884 @note: this functions uses polling with a hardcoded timeout.
886 @type instance: L{objects.Instance}
887 @param instance: the instance object
889 @return: whether the startup was successful or not
892 hv_name = instance.hypervisor
893 running_instances = GetInstanceList([hv_name])
895 if instance.name not in running_instances:
898 hyper = hypervisor.GetHypervisor(hv_name)
900 hyper.StopInstance(instance)
901 except errors.HypervisorError, err:
902 logging.error("Failed to stop instance: %s" % err)
905 # test every 10secs for 2min
908 for dummy in range(11):
909 if instance.name not in GetInstanceList([hv_name]):
913 # the shutdown did not succeed
914 logging.error("Shutdown of '%s' unsuccessful, using destroy",
918 hyper.StopInstance(instance, force=True)
919 except errors.HypervisorError, err:
920 logging.exception("Failed to stop instance: %s" % err)
924 if instance.name in GetInstanceList([hv_name]):
925 logging.error("Could not shutdown instance '%s' even by destroy",
929 _RemoveBlockDevLinks(instance.name, instance.disks)
934 def RebootInstance(instance, reboot_type, extra_args):
935 """Reboot an instance.
937 @type instance: L{objects.Instance}
938 @param instance: the instance object to reboot
939 @type reboot_type: str
940 @param reboot_type: the type of reboot, one the following
942 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
943 instance OS, do not recreate the VM
944 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
945 restart the VM (at the hypervisor level)
946 - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
947 is not accepted here, since that mode is handled
950 @return: the success of the operation
953 running_instances = GetInstanceList([instance.hypervisor])
955 if instance.name not in running_instances:
956 logging.error("Cannot reboot instance that is not running")
959 hyper = hypervisor.GetHypervisor(instance.hypervisor)
960 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
962 hyper.RebootInstance(instance)
963 except errors.HypervisorError, err:
964 logging.exception("Failed to soft reboot instance")
966 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
968 ShutdownInstance(instance)
969 StartInstance(instance, extra_args)
970 except errors.HypervisorError, err:
971 logging.exception("Failed to hard reboot instance")
974 raise errors.ParameterError("reboot_type invalid")
979 def MigrationInfo(instance):
980 """Gather information about an instance to be migrated.
982 @type instance: L{objects.Instance}
983 @param instance: the instance definition
986 hyper = hypervisor.GetHypervisor(instance.hypervisor)
988 info = hyper.MigrationInfo(instance)
989 except errors.HypervisorError, err:
990 msg = "Failed to fetch migration information"
991 logging.exception(msg)
992 return (False, '%s: %s' % (msg, err))
996 def AcceptInstance(instance, info, target):
997 """Prepare the node to accept an instance.
999 @type instance: L{objects.Instance}
1000 @param instance: the instance definition
1001 @type info: string/data (opaque)
1002 @param info: migration information, from the source node
1003 @type target: string
1004 @param target: target host (usually ip), on this node
1007 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1009 hyper.AcceptInstance(instance, info, target)
1010 except errors.HypervisorError, err:
1011 msg = "Failed to accept instance"
1012 logging.exception(msg)
1013 return (False, '%s: %s' % (msg, err))
1014 return (True, "Accept successfull")
1017 def FinalizeMigration(instance, info, success):
1018 """Finalize any preparation to accept an instance.
1020 @type instance: L{objects.Instance}
1021 @param instance: the instance definition
1022 @type info: string/data (opaque)
1023 @param info: migration information, from the source node
1024 @type success: boolean
1025 @param success: whether the migration was a success or a failure
1028 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1030 hyper.FinalizeMigration(instance, info, success)
1031 except errors.HypervisorError, err:
1032 msg = "Failed to finalize migration"
1033 logging.exception(msg)
1034 return (False, '%s: %s' % (msg, err))
1035 return (True, "Migration Finalized")
1038 def MigrateInstance(instance, target, live):
1039 """Migrates an instance to another node.
1041 @type instance: L{objects.Instance}
1042 @param instance: the instance definition
1043 @type target: string
1044 @param target: the target node name
1046 @param live: whether the migration should be done live or not (the
1047 interpretation of this parameter is left to the hypervisor)
1049 @return: a tuple of (success, msg) where:
1050 - succes is a boolean denoting the success/failure of the operation
1051 - msg is a string with details in case of failure
1054 hyper = hypervisor.GetHypervisor(instance.hypervisor)
1057 hyper.MigrateInstance(instance.name, target, live)
1058 except errors.HypervisorError, err:
1059 msg = "Failed to migrate instance"
1060 logging.exception(msg)
1061 return (False, "%s: %s" % (msg, err))
1062 return (True, "Migration successfull")
1065 def CreateBlockDevice(disk, size, owner, on_primary, info):
1066 """Creates a block device for an instance.
1068 @type disk: L{objects.Disk}
1069 @param disk: the object describing the disk we should create
1071 @param size: the size of the physical underlying device, in MiB
1073 @param owner: the name of the instance for which disk is created,
1074 used for device cache data
1075 @type on_primary: boolean
1076 @param on_primary: indicates if it is the primary node or not
1078 @param info: string that will be sent to the physical device
1079 creation, used for example to set (LVM) tags on LVs
1081 @return: the new unique_id of the device (this can sometime be
1082 computed only after creation), or None. On secondary nodes,
1083 it's not required to return anything.
1088 for child in disk.children:
1089 crdev = _RecursiveAssembleBD(child, owner, on_primary)
1090 if on_primary or disk.AssembleOnSecondary():
1091 # we need the children open in case the device itself has to
1097 device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1098 except errors.GenericError, err:
1099 return False, "Can't create block device: %s" % str(err)
1101 if on_primary or disk.AssembleOnSecondary():
1102 if not device.Assemble():
1103 errorstring = "Can't assemble device after creation, very unusual event"
1104 logging.error(errorstring)
1105 return False, errorstring
1106 device.SetSyncSpeed(constants.SYNC_SPEED)
1107 if on_primary or disk.OpenOnSecondary():
1108 device.Open(force=True)
1109 DevCacheManager.UpdateCache(device.dev_path, owner,
1110 on_primary, disk.iv_name)
1112 device.SetInfo(info)
1114 physical_id = device.unique_id
1115 return True, physical_id
1118 def RemoveBlockDevice(disk):
1119 """Remove a block device.
1121 @note: This is intended to be called recursively.
1123 @type disk: L{objects.Disk}
1124 @param disk: the disk object we should remove
1126 @return: the success of the operation
1130 rdev = _RecursiveFindBD(disk)
1131 except errors.BlockDeviceError, err:
1132 # probably can't attach
1133 logging.info("Can't attach to device %s in remove", disk)
1135 if rdev is not None:
1136 r_path = rdev.dev_path
1137 result = rdev.Remove()
1139 DevCacheManager.RemoveCache(r_path)
1143 for child in disk.children:
1144 result = result and RemoveBlockDevice(child)
1148 def _RecursiveAssembleBD(disk, owner, as_primary):
1149 """Activate a block device for an instance.
1151 This is run on the primary and secondary nodes for an instance.
1153 @note: this function is called recursively.
1155 @type disk: L{objects.Disk}
1156 @param disk: the disk we try to assemble
1158 @param owner: the name of the instance which owns the disk
1159 @type as_primary: boolean
1160 @param as_primary: if we should make the block device
1163 @return: the assembled device or None (in case no device
1165 @raise errors.BlockDeviceError: in case there is an error
1166 during the activation of the children or the device
1172 mcn = disk.ChildrenNeeded()
1174 mcn = 0 # max number of Nones allowed
1176 mcn = len(disk.children) - mcn # max number of Nones
1177 for chld_disk in disk.children:
1179 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1180 except errors.BlockDeviceError, err:
1181 if children.count(None) >= mcn:
1184 logging.debug("Error in child activation: %s", str(err))
1185 children.append(cdev)
1187 if as_primary or disk.AssembleOnSecondary():
1188 r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1189 r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1191 if as_primary or disk.OpenOnSecondary():
1193 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1194 as_primary, disk.iv_name)
1201 def AssembleBlockDevice(disk, owner, as_primary):
1202 """Activate a block device for an instance.
1204 This is a wrapper over _RecursiveAssembleBD.
1206 @rtype: str or boolean
1207 @return: a C{/dev/...} path for primary nodes, and
1208 C{True} for secondary nodes
1211 result = _RecursiveAssembleBD(disk, owner, as_primary)
1212 if isinstance(result, bdev.BlockDev):
1213 result = result.dev_path
1217 def ShutdownBlockDevice(disk):
1218 """Shut down a block device.
1220 First, if the device is assembled (Attach() is successfull), then
1221 the device is shutdown. Then the children of the device are
1224 This function is called recursively. Note that we don't cache the
1225 children or such, as oppossed to assemble, shutdown of different
1226 devices doesn't require that the upper device was active.
1228 @type disk: L{objects.Disk}
1229 @param disk: the description of the disk we should
1232 @return: the success of the operation
1235 r_dev = _RecursiveFindBD(disk)
1236 if r_dev is not None:
1237 r_path = r_dev.dev_path
1238 result = r_dev.Shutdown()
1240 DevCacheManager.RemoveCache(r_path)
1244 for child in disk.children:
1245 result = result and ShutdownBlockDevice(child)
1249 def MirrorAddChildren(parent_cdev, new_cdevs):
1250 """Extend a mirrored block device.
1252 @type parent_cdev: L{objects.Disk}
1253 @param parent_cdev: the disk to which we should add children
1254 @type new_cdevs: list of L{objects.Disk}
1255 @param new_cdevs: the list of children which we should add
1257 @return: the success of the operation
1260 parent_bdev = _RecursiveFindBD(parent_cdev)
1261 if parent_bdev is None:
1262 logging.error("Can't find parent device")
1264 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1265 if new_bdevs.count(None) > 0:
1266 logging.error("Can't find new device(s) to add: %s:%s",
1267 new_bdevs, new_cdevs)
1269 parent_bdev.AddChildren(new_bdevs)
1273 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1274 """Shrink a mirrored block device.
1276 @type parent_cdev: L{objects.Disk}
1277 @param parent_cdev: the disk from which we should remove children
1278 @type new_cdevs: list of L{objects.Disk}
1279 @param new_cdevs: the list of children which we should remove
1281 @return: the success of the operation
1284 parent_bdev = _RecursiveFindBD(parent_cdev)
1285 if parent_bdev is None:
1286 logging.error("Can't find parent in remove children: %s", parent_cdev)
1289 for disk in new_cdevs:
1290 rpath = disk.StaticDevPath()
1292 bd = _RecursiveFindBD(disk)
1294 logging.error("Can't find dynamic device %s while removing children",
1298 devs.append(bd.dev_path)
1301 parent_bdev.RemoveChildren(devs)
1305 def GetMirrorStatus(disks):
1306 """Get the mirroring status of a list of devices.
1308 @type disks: list of L{objects.Disk}
1309 @param disks: the list of disks which we should query
1312 a list of (mirror_done, estimated_time) tuples, which
1313 are the result of L{bdev.BlockDev.CombinedSyncStatus}
1314 @raise errors.BlockDeviceError: if any of the disks cannot be
1320 rbd = _RecursiveFindBD(dsk)
1322 raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1323 stats.append(rbd.CombinedSyncStatus())
1327 def _RecursiveFindBD(disk):
1328 """Check if a device is activated.
1330 If so, return informations about the real device.
1332 @type disk: L{objects.Disk}
1333 @param disk: the disk object we need to find
1335 @return: None if the device can't be found,
1336 otherwise the device instance
1341 for chdisk in disk.children:
1342 children.append(_RecursiveFindBD(chdisk))
1344 return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1347 def FindBlockDevice(disk):
1348 """Check if a device is activated.
1350 If it is, return informations about the real device.
1352 @type disk: L{objects.Disk}
1353 @param disk: the disk to find
1354 @rtype: None or tuple
1355 @return: None if the disk cannot be found, otherwise a
1356 tuple (device_path, major, minor, sync_percent,
1357 estimated_time, is_degraded)
1360 rbd = _RecursiveFindBD(disk)
1363 return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1366 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1367 """Write a file to the filesystem.
1369 This allows the master to overwrite(!) a file. It will only perform
1370 the operation if the file belongs to a list of configuration files.
1372 @type file_name: str
1373 @param file_name: the target file name
1375 @param data: the new contents of the file
1377 @param mode: the mode to give the file (can be None)
1379 @param uid: the owner of the file (can be -1 for default)
1381 @param gid: the group of the file (can be -1 for default)
1383 @param atime: the atime to set on the file (can be None)
1385 @param mtime: the mtime to set on the file (can be None)
1387 @return: the success of the operation; errors are logged
1388 in the node daemon log
1391 if not os.path.isabs(file_name):
1392 logging.error("Filename passed to UploadFile is not absolute: '%s'",
1397 constants.CLUSTER_CONF_FILE,
1398 constants.ETC_HOSTS,
1399 constants.SSH_KNOWN_HOSTS_FILE,
1400 constants.VNC_PASSWORD_FILE,
1403 if file_name not in allowed_files:
1404 logging.error("Filename passed to UploadFile not in allowed"
1405 " upload targets: '%s'", file_name)
1408 raw_data = _Decompress(data)
1410 utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1411 atime=atime, mtime=mtime)
1415 def WriteSsconfFiles(values):
1416 """Update all ssconf files.
1418 Wrapper around the SimpleStore.WriteFiles.
1421 ssconf.SimpleStore().WriteFiles(values)
1424 def _ErrnoOrStr(err):
1425 """Format an EnvironmentError exception.
1427 If the L{err} argument has an errno attribute, it will be looked up
1428 and converted into a textual C{E...} description. Otherwise the
1429 string representation of the error will be returned.
1431 @type err: L{EnvironmentError}
1432 @param err: the exception to format
1435 if hasattr(err, 'errno'):
1436 detail = errno.errorcode[err.errno]
1442 def _OSOndiskVersion(name, os_dir):
1443 """Compute and return the API version of a given OS.
1445 This function will try to read the API version of the OS given by
1446 the 'name' parameter and residing in the 'os_dir' directory.
1449 @param name: the OS name we should look for
1451 @param os_dir: the directory inwhich we should look for the OS
1454 Either an integer denoting the version or None in the
1455 case when this is not a valid OS name.
1456 @raise errors.InvalidOS: if the OS cannot be found
1459 api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1462 st = os.stat(api_file)
1463 except EnvironmentError, err:
1464 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1465 " found (%s)" % _ErrnoOrStr(err))
1467 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1468 raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1474 api_versions = f.readlines()
1477 except EnvironmentError, err:
1478 raise errors.InvalidOS(name, os_dir, "error while reading the"
1479 " API version (%s)" % _ErrnoOrStr(err))
1481 api_versions = [version.strip() for version in api_versions]
1483 api_versions = [int(version) for version in api_versions]
1484 except (TypeError, ValueError), err:
1485 raise errors.InvalidOS(name, os_dir,
1486 "API version is not integer (%s)" % str(err))
1491 def DiagnoseOS(top_dirs=None):
1492 """Compute the validity for all OSes.
1494 @type top_dirs: list
1495 @param top_dirs: the list of directories in which to
1496 search (if not given defaults to
1497 L{constants.OS_SEARCH_PATH})
1498 @rtype: list of L{objects.OS}
1499 @return: an OS object for each name in all the given
1503 if top_dirs is None:
1504 top_dirs = constants.OS_SEARCH_PATH
1507 for dir_name in top_dirs:
1508 if os.path.isdir(dir_name):
1510 f_names = utils.ListVisibleFiles(dir_name)
1511 except EnvironmentError, err:
1512 logging.exception("Can't list the OS directory %s", dir_name)
1514 for name in f_names:
1516 os_inst = OSFromDisk(name, base_dir=dir_name)
1517 result.append(os_inst)
1518 except errors.InvalidOS, err:
1519 result.append(objects.OS.FromInvalidOS(err))
1524 def OSFromDisk(name, base_dir=None):
1525 """Create an OS instance from disk.
1527 This function will return an OS instance if the given name is a
1528 valid OS name. Otherwise, it will raise an appropriate
1529 L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1531 @type base_dir: string
1532 @keyword base_dir: Base directory containing OS installations.
1533 Defaults to a search in all the OS_SEARCH_PATH dirs.
1534 @rtype: L{objects.OS}
1535 @return: the OS instance if we find a valid one
1536 @raise errors.InvalidOS: if we don't find a valid OS
1539 if base_dir is None:
1540 os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1542 raise errors.InvalidOS(name, None, "OS dir not found in search path")
1544 os_dir = os.path.sep.join([base_dir, name])
1546 api_versions = _OSOndiskVersion(name, os_dir)
1548 if constants.OS_API_VERSION not in api_versions:
1549 raise errors.InvalidOS(name, os_dir, "API version mismatch"
1550 " (found %s want %s)"
1551 % (api_versions, constants.OS_API_VERSION))
1553 # OS Scripts dictionary, we will populate it with the actual script names
1554 os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1556 for script in os_scripts:
1557 os_scripts[script] = os.path.sep.join([os_dir, script])
1560 st = os.stat(os_scripts[script])
1561 except EnvironmentError, err:
1562 raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1563 (script, _ErrnoOrStr(err)))
1565 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1566 raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1569 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1570 raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1574 return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1575 create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1576 export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1577 import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1578 rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1579 api_versions=api_versions)
1581 def OSEnvironment(instance, debug=0):
1582 """Calculate the environment for an os script.
1584 @type instance: L{objects.Instance}
1585 @param instance: target instance for the os script run
1586 @type debug: integer
1587 @param debug: debug level (0 or 1, for OS Api 10)
1589 @return: dict of environment variables
1590 @raise errors.BlockDeviceError: if the block device
1595 result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1596 result['INSTANCE_NAME'] = instance.name
1597 result['INSTANCE_OS'] = instance.os
1598 result['HYPERVISOR'] = instance.hypervisor
1599 result['DISK_COUNT'] = '%d' % len(instance.disks)
1600 result['NIC_COUNT'] = '%d' % len(instance.nics)
1601 result['DEBUG_LEVEL'] = '%d' % debug
1602 for idx, disk in enumerate(instance.disks):
1603 real_disk = _RecursiveFindBD(disk)
1604 if real_disk is None:
1605 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1608 result['DISK_%d_PATH' % idx] = real_disk.dev_path
1609 # FIXME: When disks will have read-only mode, populate this
1610 result['DISK_%d_ACCESS' % idx] = disk.mode
1611 if constants.HV_DISK_TYPE in instance.hvparams:
1612 result['DISK_%d_FRONTEND_TYPE' % idx] = \
1613 instance.hvparams[constants.HV_DISK_TYPE]
1614 if disk.dev_type in constants.LDS_BLOCK:
1615 result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1616 elif disk.dev_type == constants.LD_FILE:
1617 result['DISK_%d_BACKEND_TYPE' % idx] = \
1618 'file:%s' % disk.physical_id[0]
1619 for idx, nic in enumerate(instance.nics):
1620 result['NIC_%d_MAC' % idx] = nic.mac
1622 result['NIC_%d_IP' % idx] = nic.ip
1623 result['NIC_%d_BRIDGE' % idx] = nic.bridge
1624 if constants.HV_NIC_TYPE in instance.hvparams:
1625 result['NIC_%d_FRONTEND_TYPE' % idx] = \
1626 instance.hvparams[constants.HV_NIC_TYPE]
1630 def GrowBlockDevice(disk, amount):
1631 """Grow a stack of block devices.
1633 This function is called recursively, with the childrens being the
1634 first ones to resize.
1636 @type disk: L{objects.Disk}
1637 @param disk: the disk to be grown
1638 @rtype: (status, result)
1639 @return: a tuple with the status of the operation
1640 (True/False), and the errors message if status
1644 r_dev = _RecursiveFindBD(disk)
1646 return False, "Cannot find block device %s" % (disk,)
1650 except errors.BlockDeviceError, err:
1651 return False, str(err)
1656 def SnapshotBlockDevice(disk):
1657 """Create a snapshot copy of a block device.
1659 This function is called recursively, and the snapshot is actually created
1660 just for the leaf lvm backend device.
1662 @type disk: L{objects.Disk}
1663 @param disk: the disk to be snapshotted
1665 @return: snapshot disk path
1669 if len(disk.children) == 1:
1670 # only one child, let's recurse on it
1671 return SnapshotBlockDevice(disk.children[0])
1673 # more than one child, choose one that matches
1674 for child in disk.children:
1675 if child.size == disk.size:
1676 # return implies breaking the loop
1677 return SnapshotBlockDevice(child)
1678 elif disk.dev_type == constants.LD_LV:
1679 r_dev = _RecursiveFindBD(disk)
1680 if r_dev is not None:
1681 # let's stay on the safe side and ask for the full size, for now
1682 return r_dev.Snapshot(disk.size)
1686 raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1687 " '%s' of type '%s'" %
1688 (disk.unique_id, disk.dev_type))
1691 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1692 """Export a block device snapshot to a remote node.
1694 @type disk: L{objects.Disk}
1695 @param disk: the description of the disk to export
1696 @type dest_node: str
1697 @param dest_node: the destination node to export to
1698 @type instance: L{objects.Instance}
1699 @param instance: the instance object to whom the disk belongs
1700 @type cluster_name: str
1701 @param cluster_name: the cluster name, needed for SSH hostalias
1703 @param idx: the index of the disk in the instance's disk list,
1704 used to export to the OS scripts environment
1706 @return: the success of the operation
1709 export_env = OSEnvironment(instance)
1711 inst_os = OSFromDisk(instance.os)
1712 export_script = inst_os.export_script
1714 logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1715 instance.name, int(time.time()))
1716 if not os.path.exists(constants.LOG_OS_DIR):
1717 os.mkdir(constants.LOG_OS_DIR, 0750)
1718 real_disk = _RecursiveFindBD(disk)
1719 if real_disk is None:
1720 raise errors.BlockDeviceError("Block device '%s' is not set up" %
1724 export_env['EXPORT_DEVICE'] = real_disk.dev_path
1725 export_env['EXPORT_INDEX'] = str(idx)
1727 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1728 destfile = disk.physical_id[1]
1730 # the target command is built out of three individual commands,
1731 # which are joined by pipes; we check each individual command for
1733 expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1734 export_script, logfile)
1738 destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1739 destdir, destdir, destfile)
1740 remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1741 constants.GANETI_RUNAS,
1744 # all commands have been checked, so we're safe to combine them
1745 command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1747 result = utils.RunCmd(command, env=export_env)
1750 logging.error("os snapshot export command '%s' returned error: %s"
1751 " output: %s", command, result.fail_reason, result.output)
1757 def FinalizeExport(instance, snap_disks):
1758 """Write out the export configuration information.
1760 @type instance: L{objects.Instance}
1761 @param instance: the instance which we export, used for
1762 saving configuration
1763 @type snap_disks: list of L{objects.Disk}
1764 @param snap_disks: list of snapshot block devices, which
1765 will be used to get the actual name of the dump file
1768 @return: the success of the operation
1771 destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1772 finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1774 config = objects.SerializableConfigParser()
1776 config.add_section(constants.INISECT_EXP)
1777 config.set(constants.INISECT_EXP, 'version', '0')
1778 config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1779 config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1780 config.set(constants.INISECT_EXP, 'os', instance.os)
1781 config.set(constants.INISECT_EXP, 'compression', 'gzip')
1783 config.add_section(constants.INISECT_INS)
1784 config.set(constants.INISECT_INS, 'name', instance.name)
1785 config.set(constants.INISECT_INS, 'memory', '%d' %
1786 instance.beparams[constants.BE_MEMORY])
1787 config.set(constants.INISECT_INS, 'vcpus', '%d' %
1788 instance.beparams[constants.BE_VCPUS])
1789 config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1792 for nic_count, nic in enumerate(instance.nics):
1794 config.set(constants.INISECT_INS, 'nic%d_mac' %
1795 nic_count, '%s' % nic.mac)
1796 config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1797 config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1799 # TODO: redundant: on load can read nics until it doesn't exist
1800 config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1803 for disk_count, disk in enumerate(snap_disks):
1806 config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1807 ('%s' % disk.iv_name))
1808 config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1809 ('%s' % disk.physical_id[1]))
1810 config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1813 config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1815 utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1816 data=config.Dumps())
1817 shutil.rmtree(finaldestdir, True)
1818 shutil.move(destdir, finaldestdir)
1823 def ExportInfo(dest):
1824 """Get export configuration information.
1827 @param dest: directory containing the export
1829 @rtype: L{objects.SerializableConfigParser}
1830 @return: a serializable config file containing the
1834 cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1836 config = objects.SerializableConfigParser()
1839 if (not config.has_section(constants.INISECT_EXP) or
1840 not config.has_section(constants.INISECT_INS)):
1846 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1847 """Import an os image into an instance.
1849 @type instance: L{objects.Instance}
1850 @param instance: instance to import the disks into
1851 @type src_node: string
1852 @param src_node: source node for the disk images
1853 @type src_images: list of string
1854 @param src_images: absolute paths of the disk images
1855 @rtype: list of boolean
1856 @return: each boolean represent the success of importing the n-th disk
1859 import_env = OSEnvironment(instance)
1860 inst_os = OSFromDisk(instance.os)
1861 import_script = inst_os.import_script
1863 logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1864 instance.name, int(time.time()))
1865 if not os.path.exists(constants.LOG_OS_DIR):
1866 os.mkdir(constants.LOG_OS_DIR, 0750)
1869 impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1870 import_script, logfile)
1873 for idx, image in enumerate(src_images):
1875 destcmd = utils.BuildShellCmd('cat %s', image)
1876 remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1877 constants.GANETI_RUNAS,
1879 command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1880 import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1881 import_env['IMPORT_INDEX'] = str(idx)
1882 result = utils.RunCmd(command, env=import_env)
1884 logging.error("Disk import command '%s' returned error: %s"
1885 " output: %s", command, result.fail_reason,
1887 final_result.append(False)
1889 final_result.append(True)
1891 final_result.append(True)
1897 """Return a list of exports currently available on this machine.
1900 @return: list of the exports
1903 if os.path.isdir(constants.EXPORT_DIR):
1904 return utils.ListVisibleFiles(constants.EXPORT_DIR)
1909 def RemoveExport(export):
1910 """Remove an existing export from the node.
1913 @param export: the name of the export to remove
1915 @return: the success of the operation
1918 target = os.path.join(constants.EXPORT_DIR, export)
1920 shutil.rmtree(target)
1921 # TODO: catch some of the relevant exceptions and provide a pretty
1922 # error message if rmtree fails.
1927 def RenameBlockDevices(devlist):
1928 """Rename a list of block devices.
1930 @type devlist: list of tuples
1931 @param devlist: list of tuples of the form (disk,
1932 new_logical_id, new_physical_id); disk is an
1933 L{objects.Disk} object describing the current disk,
1934 and new logical_id/physical_id is the name we
1937 @return: True if all renames succeeded, False otherwise
1941 for disk, unique_id in devlist:
1942 dev = _RecursiveFindBD(disk)
1947 old_rpath = dev.dev_path
1948 dev.Rename(unique_id)
1949 new_rpath = dev.dev_path
1950 if old_rpath != new_rpath:
1951 DevCacheManager.RemoveCache(old_rpath)
1952 # FIXME: we should add the new cache information here, like:
1953 # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1954 # but we don't have the owner here - maybe parse from existing
1955 # cache? for now, we only lose lvm data when we rename, which
1956 # is less critical than DRBD or MD
1957 except errors.BlockDeviceError, err:
1958 logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1963 def _TransformFileStorageDir(file_storage_dir):
1964 """Checks whether given file_storage_dir is valid.
1966 Checks wheter the given file_storage_dir is within the cluster-wide
1967 default file_storage_dir stored in SimpleStore. Only paths under that
1968 directory are allowed.
1970 @type file_storage_dir: str
1971 @param file_storage_dir: the path to check
1973 @return: the normalized path if valid, None otherwise
1977 file_storage_dir = os.path.normpath(file_storage_dir)
1978 base_file_storage_dir = cfg.GetFileStorageDir()
1979 if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1980 base_file_storage_dir):
1981 logging.error("file storage directory '%s' is not under base file"
1982 " storage directory '%s'",
1983 file_storage_dir, base_file_storage_dir)
1985 return file_storage_dir
1988 def CreateFileStorageDir(file_storage_dir):
1989 """Create file storage directory.
1991 @type file_storage_dir: str
1992 @param file_storage_dir: directory to create
1995 @return: tuple with first element a boolean indicating wheter dir
1996 creation was successful or not
1999 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2001 if not file_storage_dir:
2004 if os.path.exists(file_storage_dir):
2005 if not os.path.isdir(file_storage_dir):
2006 logging.error("'%s' is not a directory", file_storage_dir)
2010 os.makedirs(file_storage_dir, 0750)
2011 except OSError, err:
2012 logging.error("Cannot create file storage directory '%s': %s",
2013 file_storage_dir, err)
2018 def RemoveFileStorageDir(file_storage_dir):
2019 """Remove file storage directory.
2021 Remove it only if it's empty. If not log an error and return.
2023 @type file_storage_dir: str
2024 @param file_storage_dir: the directory we should cleanup
2025 @rtype: tuple (success,)
2026 @return: tuple of one element, C{success}, denoting
2027 whether the operation was successfull
2030 file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2032 if not file_storage_dir:
2035 if os.path.exists(file_storage_dir):
2036 if not os.path.isdir(file_storage_dir):
2037 logging.error("'%s' is not a directory", file_storage_dir)
2039 # deletes dir only if empty, otherwise we want to return False
2041 os.rmdir(file_storage_dir)
2042 except OSError, err:
2043 logging.exception("Cannot remove file storage directory '%s'",
2049 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2050 """Rename the file storage directory.
2052 @type old_file_storage_dir: str
2053 @param old_file_storage_dir: the current path
2054 @type new_file_storage_dir: str
2055 @param new_file_storage_dir: the name we should rename to
2056 @rtype: tuple (success,)
2057 @return: tuple of one element, C{success}, denoting
2058 whether the operation was successful
2061 old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2062 new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2064 if not old_file_storage_dir or not new_file_storage_dir:
2067 if not os.path.exists(new_file_storage_dir):
2068 if os.path.isdir(old_file_storage_dir):
2070 os.rename(old_file_storage_dir, new_file_storage_dir)
2071 except OSError, err:
2072 logging.exception("Cannot rename '%s' to '%s'",
2073 old_file_storage_dir, new_file_storage_dir)
2076 logging.error("'%s' is not a directory", old_file_storage_dir)
2079 if os.path.exists(old_file_storage_dir):
2080 logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2081 old_file_storage_dir, new_file_storage_dir)
2086 def _IsJobQueueFile(file_name):
2087 """Checks whether the given filename is in the queue directory.
2089 @type file_name: str
2090 @param file_name: the file name we should check
2092 @return: whether the file is under the queue directory
2095 queue_dir = os.path.normpath(constants.QUEUE_DIR)
2096 result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2099 logging.error("'%s' is not a file in the queue directory",
2105 def JobQueueUpdate(file_name, content):
2106 """Updates a file in the queue directory.
2108 This is just a wrapper over L{utils.WriteFile}, with proper
2111 @type file_name: str
2112 @param file_name: the job file name
2114 @param content: the new job contents
2116 @return: the success of the operation
2119 if not _IsJobQueueFile(file_name):
2122 # Write and replace the file atomically
2123 utils.WriteFile(file_name, data=_Decompress(content))
2128 def JobQueueRename(old, new):
2129 """Renames a job queue file.
2131 This is just a wrapper over os.rename with proper checking.
2134 @param old: the old (actual) file name
2136 @param new: the desired file name
2138 @return: the success of the operation
2141 if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2144 utils.RenameFile(old, new, mkdir=True)
2149 def JobQueueSetDrainFlag(drain_flag):
2150 """Set the drain flag for the queue.
2152 This will set or unset the queue drain flag.
2154 @type drain_flag: boolean
2155 @param drain_flag: if True, will set the drain flag, otherwise reset it.
2157 @return: always True
2158 @warning: the function always returns True
2162 utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2164 utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2169 def CloseBlockDevices(instance_name, disks):
2170 """Closes the given block devices.
2172 This means they will be switched to secondary mode (in case of
2175 @param instance_name: if the argument is not empty, the symlinks
2176 of this instance will be removed
2177 @type disks: list of L{objects.Disk}
2178 @param disks: the list of disks to be closed
2179 @rtype: tuple (success, message)
2180 @return: a tuple of success and message, where success
2181 indicates the succes of the operation, and message
2182 which will contain the error details in case we
2188 rd = _RecursiveFindBD(cf)
2190 return (False, "Can't find device %s" % cf)
2197 except errors.BlockDeviceError, err:
2198 msg.append(str(err))
2200 return (False, "Can't make devices secondary: %s" % ",".join(msg))
2203 _RemoveBlockDevLinks(instance_name, disks)
2204 return (True, "All devices secondary")
2207 def ValidateHVParams(hvname, hvparams):
2208 """Validates the given hypervisor parameters.
2210 @type hvname: string
2211 @param hvname: the hypervisor name
2212 @type hvparams: dict
2213 @param hvparams: the hypervisor parameters to be validated
2214 @rtype: tuple (success, message)
2215 @return: a tuple of success and message, where success
2216 indicates the succes of the operation, and message
2217 which will contain the error details in case we
2222 hv_type = hypervisor.GetHypervisor(hvname)
2223 hv_type.ValidateParameters(hvparams)
2224 return (True, "Validation passed")
2225 except errors.HypervisorError, err:
2226 return (False, str(err))
2230 """Demotes the current node from master candidate role.
2233 # try to ensure we're not the master by mistake
2234 master, myself = ssconf.GetMasterAndMyself()
2235 if master == myself:
2236 return (False, "ssconf status shows I'm the master node, will not demote")
2237 pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2238 if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2239 return (False, "The master daemon is running, will not demote")
2241 utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2242 except EnvironmentError, err:
2243 if err.errno != errno.ENOENT:
2244 return (False, "Error while backing up cluster file: %s" % str(err))
2245 utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2246 return (True, "Done")
2249 def _FindDisks(nodes_ip, disks):
2250 """Sets the physical ID on disks and returns the block devices.
2253 # set the correct physical ID
2254 my_name = utils.HostInfo().name
2256 cf.SetPhysicalID(my_name, nodes_ip)
2261 rd = _RecursiveFindBD(cf)
2263 return (False, "Can't find device %s" % cf)
2265 return (True, bdevs)
2268 def DrbdDisconnectNet(nodes_ip, disks):
2269 """Disconnects the network on a list of drbd devices.
2272 status, bdevs = _FindDisks(nodes_ip, disks)
2274 return status, bdevs
2280 except errors.BlockDeviceError, err:
2281 logging.exception("Failed to go into standalone mode")
2282 return (False, "Can't change network configuration: %s" % str(err))
2283 return (True, "All disks are now disconnected")
2286 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2287 """Attaches the network on a list of drbd devices.
2290 status, bdevs = _FindDisks(nodes_ip, disks)
2292 return status, bdevs
2295 for idx, rd in enumerate(bdevs):
2297 _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2298 except EnvironmentError, err:
2299 return (False, "Can't create symlink: %s" % str(err))
2300 # reconnect disks, switch to new master configuration and if
2301 # needed primary mode
2304 rd.AttachNet(multimaster)
2305 except errors.BlockDeviceError, err:
2306 return (False, "Can't change network configuration: %s" % str(err))
2307 # wait until the disks are connected; we need to retry the re-attach
2308 # if the device becomes standalone, as this might happen if the one
2309 # node disconnects and reconnects in a different mode before the
2310 # other node reconnects; in this case, one or both of the nodes will
2311 # decide it has wrong configuration and switch to standalone
2312 RECONNECT_TIMEOUT = 2 * 60
2313 sleep_time = 0.100 # start with 100 miliseconds
2314 timeout_limit = time.time() + RECONNECT_TIMEOUT
2315 while time.time() < timeout_limit:
2316 all_connected = True
2318 stats = rd.GetProcStatus()
2319 if not (stats.is_connected or stats.is_in_resync):
2320 all_connected = False
2321 if stats.is_standalone:
2322 # peer had different config info and this node became
2323 # standalone, even though this should not happen with the
2324 # new staged way of changing disk configs
2326 rd.ReAttachNet(multimaster)
2327 except errors.BlockDeviceError, err:
2328 return (False, "Can't change network configuration: %s" % str(err))
2331 time.sleep(sleep_time)
2332 sleep_time = min(5, sleep_time * 1.5)
2333 if not all_connected:
2334 return (False, "Timeout in disk reconnecting")
2336 # change to primary mode
2340 msg = "multi-master and primary"
2342 msg = "single-master"
2343 return (True, "Disks are now configured as %s" % msg)
2346 def DrbdWaitSync(nodes_ip, disks):
2347 """Wait until DRBDs have synchronized.
2350 status, bdevs = _FindDisks(nodes_ip, disks)
2352 return status, bdevs
2358 stats = rd.GetProcStatus()
2359 if not (stats.is_connected or stats.is_in_resync):
2362 alldone = alldone and (not stats.is_in_resync)
2363 if stats.sync_percent is not None:
2364 min_resync = min(min_resync, stats.sync_percent)
2365 return (not failure, (alldone, min_resync))
2368 class HooksRunner(object):
2371 This class is instantiated on the node side (ganeti-noded) and not
2375 RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2377 def __init__(self, hooks_base_dir=None):
2378 """Constructor for hooks runner.
2380 @type hooks_base_dir: str or None
2381 @param hooks_base_dir: if not None, this overrides the
2382 L{constants.HOOKS_BASE_DIR} (useful for unittests)
2385 if hooks_base_dir is None:
2386 hooks_base_dir = constants.HOOKS_BASE_DIR
2387 self._BASE_DIR = hooks_base_dir
2390 def ExecHook(script, env):
2391 """Exec one hook script.
2394 @param script: the full path to the script
2396 @param env: the environment with which to exec the script
2397 @rtype: tuple (success, message)
2398 @return: a tuple of success and message, where success
2399 indicates the succes of the operation, and message
2400 which will contain the error details in case we
2404 # exec the process using subprocess and log the output
2407 fdstdin = open("/dev/null", "r")
2408 child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2409 stderr=subprocess.STDOUT, close_fds=True,
2410 shell=False, cwd="/", env=env)
2413 output = child.stdout.read(4096)
2414 child.stdout.close()
2415 except EnvironmentError, err:
2416 output += "Hook script error: %s" % str(err)
2420 result = child.wait()
2422 except EnvironmentError, err:
2423 if err.errno == errno.EINTR:
2427 # try not to leak fds
2428 for fd in (fdstdin, ):
2432 except EnvironmentError, err:
2433 # just log the error
2434 #logging.exception("Error while closing fd %s", fd)
2437 return result == 0, output
2439 def RunHooks(self, hpath, phase, env):
2440 """Run the scripts in the hooks directory.
2443 @param hpath: the path to the hooks directory which
2446 @param phase: either L{constants.HOOKS_PHASE_PRE} or
2447 L{constants.HOOKS_PHASE_POST}
2449 @param env: dictionary with the environment for the hook
2451 @return: list of 3-element tuples:
2453 - script result, either L{constants.HKR_SUCCESS} or
2454 L{constants.HKR_FAIL}
2455 - output of the script
2457 @raise errors.ProgrammerError: for invalid input
2461 if phase == constants.HOOKS_PHASE_PRE:
2463 elif phase == constants.HOOKS_PHASE_POST:
2466 raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2469 subdir = "%s-%s.d" % (hpath, suffix)
2470 dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2472 dir_contents = utils.ListVisibleFiles(dir_name)
2473 except OSError, err:
2474 # FIXME: must log output in case of failures
2477 # we use the standard python sort order,
2478 # so 00name is the recommended naming scheme
2480 for relname in dir_contents:
2481 fname = os.path.join(dir_name, relname)
2482 if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2483 self.RE_MASK.match(relname) is not None):
2484 rrval = constants.HKR_SKIP
2487 result, output = self.ExecHook(fname, env)
2489 rrval = constants.HKR_FAIL
2491 rrval = constants.HKR_SUCCESS
2492 rr.append(("%s/%s" % (subdir, relname), rrval, output))
2497 class IAllocatorRunner(object):
2498 """IAllocator runner.
2500 This class is instantiated on the node side (ganeti-noded) and not on
2504 def Run(self, name, idata):
2505 """Run an iallocator script.
2508 @param name: the iallocator script name
2510 @param idata: the allocator input data
2513 @return: four element tuple of:
2514 - run status (one of the IARUN_ constants)
2517 - fail reason (as from L{utils.RunResult})
2520 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2522 if alloc_script is None:
2523 return (constants.IARUN_NOTFOUND, None, None, None)
2525 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2529 result = utils.RunCmd([alloc_script, fin_name])
2531 return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2536 return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2539 class DevCacheManager(object):
2540 """Simple class for managing a cache of block device information.
2543 _DEV_PREFIX = "/dev/"
2544 _ROOT_DIR = constants.BDEV_CACHE_DIR
2547 def _ConvertPath(cls, dev_path):
2548 """Converts a /dev/name path to the cache file name.
2550 This replaces slashes with underscores and strips the /dev
2551 prefix. It then returns the full path to the cache file.
2554 @param dev_path: the C{/dev/} path name
2556 @return: the converted path name
2559 if dev_path.startswith(cls._DEV_PREFIX):
2560 dev_path = dev_path[len(cls._DEV_PREFIX):]
2561 dev_path = dev_path.replace("/", "_")
2562 fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2566 def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2567 """Updates the cache information for a given device.
2570 @param dev_path: the pathname of the device
2572 @param owner: the owner (instance name) of the device
2573 @type on_primary: bool
2574 @param on_primary: whether this is the primary
2577 @param iv_name: the instance-visible name of the
2578 device, as in objects.Disk.iv_name
2583 if dev_path is None:
2584 logging.error("DevCacheManager.UpdateCache got a None dev_path")
2586 fpath = cls._ConvertPath(dev_path)
2592 iv_name = "not_visible"
2593 fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2595 utils.WriteFile(fpath, data=fdata)
2596 except EnvironmentError, err:
2597 logging.exception("Can't update bdev cache for %s", dev_path)
2600 def RemoveCache(cls, dev_path):
2601 """Remove data for a dev_path.
2603 This is just a wrapper over L{utils.RemoveFile} with a converted
2604 path name and logging.
2607 @param dev_path: the pathname of the device
2612 if dev_path is None:
2613 logging.error("DevCacheManager.RemoveCache got a None dev_path")
2615 fpath = cls._ConvertPath(dev_path)
2617 utils.RemoveFile(fpath)
2618 except EnvironmentError, err:
2619 logging.exception("Can't update bdev cache for %s", dev_path)