4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement CheckPrereq which also fills in the opcode instance
52 with all the fields (even if as None)
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements (REQ_CLUSTER,
57 REQ_MASTER); note that all commands require root permissions
66 def __init__(self, processor, op, cfg, sstore):
67 """Constructor for LogicalUnit.
69 This needs to be overriden in derived classes in order to check op
79 for attr_name in self._OP_REQP:
80 attr_val = getattr(op, attr_name, None)
82 raise errors.OpPrereqError("Required parameter '%s' missing" %
85 if not cfg.IsCluster():
86 raise errors.OpPrereqError("Cluster not initialized yet,"
87 " use 'gnt-cluster init' first.")
89 master = sstore.GetMasterNode()
90 if master != utils.HostInfo().name:
91 raise errors.OpPrereqError("Commands must be run on the master"
95 """Returns the SshRunner object
99 self.__ssh = ssh.SshRunner(self.sstore)
102 ssh = property(fget=__GetSSH)
104 def CheckPrereq(self):
105 """Check prerequisites for this LU.
107 This method should check that the prerequisites for the execution
108 of this LU are fulfilled. It can do internode communication, but
109 it should be idempotent - no cluster or system changes are
112 The method should raise errors.OpPrereqError in case something is
113 not fulfilled. Its return value is ignored.
115 This method should also update all the parameters of the opcode to
116 their canonical form; e.g. a short node name must be fully
117 expanded after this method has successfully completed (so that
118 hooks, logging, etc. work correctly).
121 raise NotImplementedError
123 def Exec(self, feedback_fn):
126 This method should implement the actual work. It should raise
127 errors.OpExecError for failures that are somewhat dealt with in
131 raise NotImplementedError
133 def BuildHooksEnv(self):
134 """Build hooks environment for this LU.
136 This method should return a three-node tuple consisting of: a dict
137 containing the environment that will be used for running the
138 specific hook for this LU, a list of node names on which the hook
139 should run before the execution, and a list of node names on which
140 the hook should run after the execution.
142 The keys of the dict must not have 'GANETI_' prefixed as this will
143 be handled in the hooks runner. Also note additional keys will be
144 added by the hooks runner. If the LU doesn't define any
145 environment, an empty dict (and not None) should be returned.
147 As for the node lists, the master should not be included in the
148 them, as it will be added by the hooks runner in case this LU
149 requires a cluster to run on (otherwise we don't have a node
150 list). No nodes should be returned as an empty list (and not
153 Note that if the HPATH for a LU class is None, this function will
157 raise NotImplementedError
160 class NoHooksLU(LogicalUnit):
161 """Simple LU which runs no hooks.
163 This LU is intended as a parent for other LogicalUnits which will
164 run no hooks, in order to reduce duplicate code.
170 def BuildHooksEnv(self):
173 This is a no-op, since we don't run hooks.
179 def _AddHostToEtcHosts(hostname):
180 """Wrapper around utils.SetEtcHostsEntry.
183 hi = utils.HostInfo(name=hostname)
184 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
187 def _RemoveHostFromEtcHosts(hostname):
188 """Wrapper around utils.RemoveEtcHostsEntry.
191 hi = utils.HostInfo(name=hostname)
192 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
196 def _GetWantedNodes(lu, nodes):
197 """Returns list of checked and expanded node names.
200 nodes: List of nodes (strings) or None for all
203 if not isinstance(nodes, list):
204 raise errors.OpPrereqError("Invalid argument type 'nodes'")
210 node = lu.cfg.ExpandNodeName(name)
212 raise errors.OpPrereqError("No such node name '%s'" % name)
216 wanted = lu.cfg.GetNodeList()
217 return utils.NiceSort(wanted)
220 def _GetWantedInstances(lu, instances):
221 """Returns list of checked and expanded instance names.
224 instances: List of instances (strings) or None for all
227 if not isinstance(instances, list):
228 raise errors.OpPrereqError("Invalid argument type 'instances'")
233 for name in instances:
234 instance = lu.cfg.ExpandInstanceName(name)
236 raise errors.OpPrereqError("No such instance name '%s'" % name)
237 wanted.append(instance)
240 wanted = lu.cfg.GetInstanceList()
241 return utils.NiceSort(wanted)
244 def _CheckOutputFields(static, dynamic, selected):
245 """Checks whether all selected fields are valid.
248 static: Static fields
249 dynamic: Dynamic fields
252 static_fields = frozenset(static)
253 dynamic_fields = frozenset(dynamic)
255 all_fields = static_fields | dynamic_fields
257 if not all_fields.issuperset(selected):
258 raise errors.OpPrereqError("Unknown output fields selected: %s"
259 % ",".join(frozenset(selected).
260 difference(all_fields)))
263 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264 memory, vcpus, nics):
265 """Builds instance related env variables for hooks from single variables.
268 secondary_nodes: List of secondary nodes as strings
272 "INSTANCE_NAME": name,
273 "INSTANCE_PRIMARY": primary_node,
274 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275 "INSTANCE_OS_TYPE": os_type,
276 "INSTANCE_STATUS": status,
277 "INSTANCE_MEMORY": memory,
278 "INSTANCE_VCPUS": vcpus,
282 nic_count = len(nics)
283 for idx, (ip, bridge, mac) in enumerate(nics):
286 env["INSTANCE_NIC%d_IP" % idx] = ip
287 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
292 env["INSTANCE_NIC_COUNT"] = nic_count
297 def _BuildInstanceHookEnvByObject(instance, override=None):
298 """Builds instance related env variables for hooks from an object.
301 instance: objects.Instance object of instance
302 override: dict of values to override
305 'name': instance.name,
306 'primary_node': instance.primary_node,
307 'secondary_nodes': instance.secondary_nodes,
308 'os_type': instance.os,
309 'status': instance.os,
310 'memory': instance.memory,
311 'vcpus': instance.vcpus,
312 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
315 args.update(override)
316 return _BuildInstanceHookEnv(**args)
319 def _HasValidVG(vglist, vgname):
320 """Checks if the volume group list is valid.
322 A non-None return value means there's an error, and the return value
323 is the error message.
326 vgsize = vglist.get(vgname, None)
328 return "volume group '%s' missing" % vgname
330 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
335 def _InitSSHSetup(node):
336 """Setup the SSH configuration for the cluster.
339 This generates a dsa keypair for root, adds the pub key to the
340 permitted hosts and adds the hostkey to its own known hosts.
343 node: the name of this host as a fqdn
346 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
348 for name in priv_key, pub_key:
349 if os.path.exists(name):
350 utils.CreateBackup(name)
351 utils.RemoveFile(name)
353 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
357 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
360 f = open(pub_key, 'r')
362 utils.AddAuthorizedKey(auth_keys, f.read(8192))
367 def _InitGanetiServerSetup(ss):
368 """Setup the necessary configuration for the initial node daemon.
370 This creates the nodepass file containing the shared password for
371 the cluster and also generates the SSL certificate.
374 # Create pseudo random password
375 randpass = sha.new(os.urandom(64)).hexdigest()
376 # and write it into sstore
377 ss.SetKey(ss.SS_NODED_PASS, randpass)
379 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380 "-days", str(365*5), "-nodes", "-x509",
381 "-keyout", constants.SSL_CERT_FILE,
382 "-out", constants.SSL_CERT_FILE, "-batch"])
384 raise errors.OpExecError("could not generate server ssl cert, command"
385 " %s had exitcode %s and error message %s" %
386 (result.cmd, result.exit_code, result.output))
388 os.chmod(constants.SSL_CERT_FILE, 0400)
390 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
393 raise errors.OpExecError("Could not start the node daemon, command %s"
394 " had exitcode %s and error %s" %
395 (result.cmd, result.exit_code, result.output))
398 def _CheckInstanceBridgesExist(instance):
399 """Check that the brigdes needed by an instance exist.
402 # check bridges existance
403 brlist = [nic.bridge for nic in instance.nics]
404 if not rpc.call_bridges_exist(instance.primary_node, brlist):
405 raise errors.OpPrereqError("one or more target bridges %s does not"
406 " exist on destination node '%s'" %
407 (brlist, instance.primary_node))
410 class LUInitCluster(LogicalUnit):
411 """Initialise the cluster.
414 HPATH = "cluster-init"
415 HTYPE = constants.HTYPE_CLUSTER
416 _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
417 "def_bridge", "master_netdev", "file_storage_dir"]
420 def BuildHooksEnv(self):
423 Notes: Since we don't require a cluster, we must manually add
424 ourselves in the post-run node list.
427 env = {"OP_TARGET": self.op.cluster_name}
428 return env, [], [self.hostname.name]
430 def CheckPrereq(self):
431 """Verify that the passed name is a valid one.
434 if config.ConfigWriter.IsCluster():
435 raise errors.OpPrereqError("Cluster is already initialised")
437 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438 if not os.path.exists(constants.VNC_PASSWORD_FILE):
439 raise errors.OpPrereqError("Please prepare the cluster VNC"
441 constants.VNC_PASSWORD_FILE)
443 self.hostname = hostname = utils.HostInfo()
445 if hostname.ip.startswith("127."):
446 raise errors.OpPrereqError("This host's IP resolves to the private"
447 " range (%s). Please fix DNS or %s." %
448 (hostname.ip, constants.ETC_HOSTS))
450 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451 source=constants.LOCALHOST_IP_ADDRESS):
452 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453 " to %s,\nbut this ip address does not"
454 " belong to this host."
455 " Aborting." % hostname.ip)
457 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
459 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
461 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
463 secondary_ip = getattr(self.op, "secondary_ip", None)
464 if secondary_ip and not utils.IsValidIP(secondary_ip):
465 raise errors.OpPrereqError("Invalid secondary ip given")
467 secondary_ip != hostname.ip and
468 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469 source=constants.LOCALHOST_IP_ADDRESS))):
470 raise errors.OpPrereqError("You gave %s as secondary IP,"
471 " but it does not belong to this host." %
473 self.secondary_ip = secondary_ip
475 if not hasattr(self.op, "vg_name"):
476 self.op.vg_name = None
477 # if vg_name not None, checks if volume group is valid
479 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
481 raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
482 " you are not using lvm" % vgstatus)
484 self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
486 if not os.path.isabs(self.op.file_storage_dir):
487 raise errors.OpPrereqError("The file storage directory you have is"
488 " not an absolute path.")
490 if not os.path.exists(self.op.file_storage_dir):
492 os.makedirs(self.op.file_storage_dir, 0750)
494 raise errors.OpPrereqError("Cannot create file storage directory"
496 (self.op.file_storage_dir, err))
498 if not os.path.isdir(self.op.file_storage_dir):
499 raise errors.OpPrereqError("The file storage directory '%s' is not"
500 " a directory." % self.op.file_storage_dir)
502 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
504 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
507 if self.op.hypervisor_type not in constants.HYPER_TYPES:
508 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
509 self.op.hypervisor_type)
511 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
513 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
514 (self.op.master_netdev,
515 result.output.strip()))
517 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
518 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
519 raise errors.OpPrereqError("Init.d script '%s' missing or not"
520 " executable." % constants.NODE_INITD_SCRIPT)
522 def Exec(self, feedback_fn):
523 """Initialize the cluster.
526 clustername = self.clustername
527 hostname = self.hostname
529 # set up the simple store
530 self.sstore = ss = ssconf.SimpleStore()
531 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
532 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
533 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
534 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
535 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
536 ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
538 # set up the inter-node password and certificate
539 _InitGanetiServerSetup(ss)
541 # start the master ip
542 rpc.call_node_start_master(hostname.name)
544 # set up ssh config and /etc/hosts
545 f = open(constants.SSH_HOST_RSA_PUB, 'r')
550 sshkey = sshline.split(" ")[1]
552 _AddHostToEtcHosts(hostname.name)
553 _InitSSHSetup(hostname.name)
555 # init of cluster config file
556 self.cfg = cfgw = config.ConfigWriter()
557 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
558 sshkey, self.op.mac_prefix,
559 self.op.vg_name, self.op.def_bridge)
561 ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
564 class LUDestroyCluster(NoHooksLU):
565 """Logical unit for destroying the cluster.
570 def CheckPrereq(self):
571 """Check prerequisites.
573 This checks whether the cluster is empty.
575 Any errors are signalled by raising errors.OpPrereqError.
578 master = self.sstore.GetMasterNode()
580 nodelist = self.cfg.GetNodeList()
581 if len(nodelist) != 1 or nodelist[0] != master:
582 raise errors.OpPrereqError("There are still %d node(s) in"
583 " this cluster." % (len(nodelist) - 1))
584 instancelist = self.cfg.GetInstanceList()
586 raise errors.OpPrereqError("There are still %d instance(s) in"
587 " this cluster." % len(instancelist))
589 def Exec(self, feedback_fn):
590 """Destroys the cluster.
593 master = self.sstore.GetMasterNode()
594 if not rpc.call_node_stop_master(master):
595 raise errors.OpExecError("Could not disable the master role")
596 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
597 utils.CreateBackup(priv_key)
598 utils.CreateBackup(pub_key)
599 rpc.call_node_leave_cluster(master)
602 class LUVerifyCluster(NoHooksLU):
603 """Verifies the cluster status.
608 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
609 remote_version, feedback_fn):
610 """Run multiple tests against a node.
613 - compares ganeti version
614 - checks vg existance and size > 20G
615 - checks config file checksum
616 - checks ssh to other nodes
619 node: name of the node to check
620 file_list: required list of files
621 local_cksum: dictionary of local files and their checksums
624 # compares ganeti version
625 local_version = constants.PROTOCOL_VERSION
626 if not remote_version:
627 feedback_fn(" - ERROR: connection to %s failed" % (node))
630 if local_version != remote_version:
631 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
632 (local_version, node, remote_version))
635 # checks vg existance and size > 20G
639 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
643 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
645 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
648 # checks config file checksum
651 if 'filelist' not in node_result:
653 feedback_fn(" - ERROR: node hasn't returned file checksum data")
655 remote_cksum = node_result['filelist']
656 for file_name in file_list:
657 if file_name not in remote_cksum:
659 feedback_fn(" - ERROR: file '%s' missing" % file_name)
660 elif remote_cksum[file_name] != local_cksum[file_name]:
662 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
664 if 'nodelist' not in node_result:
666 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
668 if node_result['nodelist']:
670 for node in node_result['nodelist']:
671 feedback_fn(" - ERROR: communication with node '%s': %s" %
672 (node, node_result['nodelist'][node]))
673 hyp_result = node_result.get('hypervisor', None)
674 if hyp_result is not None:
675 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
678 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
679 """Verify an instance.
681 This function checks to see if the required block devices are
682 available on the instance's node.
687 instancelist = self.cfg.GetInstanceList()
688 if not instance in instancelist:
689 feedback_fn(" - ERROR: instance %s not in instance list %s" %
690 (instance, instancelist))
693 instanceconfig = self.cfg.GetInstanceInfo(instance)
694 node_current = instanceconfig.primary_node
697 instanceconfig.MapLVsByNode(node_vol_should)
699 for node in node_vol_should:
700 for volume in node_vol_should[node]:
701 if node not in node_vol_is or volume not in node_vol_is[node]:
702 feedback_fn(" - ERROR: volume %s missing on node %s" %
706 if not instanceconfig.status == 'down':
707 if not instance in node_instance[node_current]:
708 feedback_fn(" - ERROR: instance %s not running on node %s" %
709 (instance, node_current))
712 for node in node_instance:
713 if (not node == node_current):
714 if instance in node_instance[node]:
715 feedback_fn(" - ERROR: instance %s should not run on node %s" %
721 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
722 """Verify if there are any unknown volumes in the cluster.
724 The .os, .swap and backup volumes are ignored. All other volumes are
730 for node in node_vol_is:
731 for volume in node_vol_is[node]:
732 if node not in node_vol_should or volume not in node_vol_should[node]:
733 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
738 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
739 """Verify the list of running instances.
741 This checks what instances are running but unknown to the cluster.
745 for node in node_instance:
746 for runninginstance in node_instance[node]:
747 if runninginstance not in instancelist:
748 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
749 (runninginstance, node))
753 def CheckPrereq(self):
754 """Check prerequisites.
756 This has no prerequisites.
761 def Exec(self, feedback_fn):
762 """Verify integrity of cluster, performing various test on nodes.
766 feedback_fn("* Verifying global settings")
767 for msg in self.cfg.VerifyConfig():
768 feedback_fn(" - ERROR: %s" % msg)
770 vg_name = self.cfg.GetVGName()
771 nodelist = utils.NiceSort(self.cfg.GetNodeList())
772 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
776 # FIXME: verify OS list
778 file_names = list(self.sstore.GetFileList())
779 file_names.append(constants.SSL_CERT_FILE)
780 file_names.append(constants.CLUSTER_CONF_FILE)
781 local_checksums = utils.FingerprintFiles(file_names)
783 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
784 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
785 all_instanceinfo = rpc.call_instance_list(nodelist)
786 all_vglist = rpc.call_vg_list(nodelist)
787 node_verify_param = {
788 'filelist': file_names,
789 'nodelist': nodelist,
792 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
793 all_rversion = rpc.call_version(nodelist)
795 for node in nodelist:
796 feedback_fn("* Verifying node %s" % node)
797 result = self._VerifyNode(node, file_names, local_checksums,
798 all_vglist[node], all_nvinfo[node],
799 all_rversion[node], feedback_fn)
803 volumeinfo = all_volumeinfo[node]
805 if isinstance(volumeinfo, basestring):
806 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
807 (node, volumeinfo[-400:].encode('string_escape')))
809 node_volume[node] = {}
810 elif not isinstance(volumeinfo, dict):
811 feedback_fn(" - ERROR: connection to %s failed" % (node,))
815 node_volume[node] = volumeinfo
818 nodeinstance = all_instanceinfo[node]
819 if type(nodeinstance) != list:
820 feedback_fn(" - ERROR: connection to %s failed" % (node,))
824 node_instance[node] = nodeinstance
828 for instance in instancelist:
829 feedback_fn("* Verifying instance %s" % instance)
830 result = self._VerifyInstance(instance, node_volume, node_instance,
834 inst_config = self.cfg.GetInstanceInfo(instance)
836 inst_config.MapLVsByNode(node_vol_should)
838 feedback_fn("* Verifying orphan volumes")
839 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
843 feedback_fn("* Verifying remaining instances")
844 result = self._VerifyOrphanInstances(instancelist, node_instance,
851 class LUVerifyDisks(NoHooksLU):
852 """Verifies the cluster disks status.
857 def CheckPrereq(self):
858 """Check prerequisites.
860 This has no prerequisites.
865 def Exec(self, feedback_fn):
866 """Verify integrity of cluster disks.
869 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
871 vg_name = self.cfg.GetVGName()
872 nodes = utils.NiceSort(self.cfg.GetNodeList())
873 instances = [self.cfg.GetInstanceInfo(name)
874 for name in self.cfg.GetInstanceList()]
877 for inst in instances:
879 if (inst.status != "up" or
880 inst.disk_template not in constants.DTS_NET_MIRROR):
882 inst.MapLVsByNode(inst_lvs)
883 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
884 for node, vol_list in inst_lvs.iteritems():
886 nv_dict[(node, vol)] = inst
891 node_lvs = rpc.call_volume_list(nodes, vg_name)
898 if isinstance(lvs, basestring):
899 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
901 elif not isinstance(lvs, dict):
902 logger.Info("connection to node %s failed or invalid data returned" %
904 res_nodes.append(node)
907 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
908 inst = nv_dict.pop((node, lv_name), None)
909 if (not lv_online and inst is not None
910 and inst.name not in res_instances):
911 res_instances.append(inst.name)
913 # any leftover items in nv_dict are missing LVs, let's arrange the
915 for key, inst in nv_dict.iteritems():
916 if inst.name not in res_missing:
917 res_missing[inst.name] = []
918 res_missing[inst.name].append(key)
923 class LURenameCluster(LogicalUnit):
924 """Rename the cluster.
927 HPATH = "cluster-rename"
928 HTYPE = constants.HTYPE_CLUSTER
931 def BuildHooksEnv(self):
936 "OP_TARGET": self.sstore.GetClusterName(),
937 "NEW_NAME": self.op.name,
939 mn = self.sstore.GetMasterNode()
940 return env, [mn], [mn]
942 def CheckPrereq(self):
943 """Verify that the passed name is a valid one.
946 hostname = utils.HostInfo(self.op.name)
948 new_name = hostname.name
949 self.ip = new_ip = hostname.ip
950 old_name = self.sstore.GetClusterName()
951 old_ip = self.sstore.GetMasterIP()
952 if new_name == old_name and new_ip == old_ip:
953 raise errors.OpPrereqError("Neither the name nor the IP address of the"
954 " cluster has changed")
956 result = utils.RunCmd(["fping", "-q", new_ip])
957 if not result.failed:
958 raise errors.OpPrereqError("The given cluster IP address (%s) is"
959 " reachable on the network. Aborting." %
962 self.op.name = new_name
964 def Exec(self, feedback_fn):
965 """Rename the cluster.
968 clustername = self.op.name
972 # shutdown the master IP
973 master = ss.GetMasterNode()
974 if not rpc.call_node_stop_master(master):
975 raise errors.OpExecError("Could not disable the master role")
979 ss.SetKey(ss.SS_MASTER_IP, ip)
980 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
982 # Distribute updated ss config to all nodes
983 myself = self.cfg.GetNodeInfo(master)
984 dist_nodes = self.cfg.GetNodeList()
985 if myself.name in dist_nodes:
986 dist_nodes.remove(myself.name)
988 logger.Debug("Copying updated ssconf data to all nodes")
989 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
990 fname = ss.KeyToFilename(keyname)
991 result = rpc.call_upload_file(dist_nodes, fname)
992 for to_node in dist_nodes:
993 if not result[to_node]:
994 logger.Error("copy of file %s to node %s failed" %
997 if not rpc.call_node_start_master(master):
998 logger.Error("Could not re-enable the master role on the master,"
999 " please restart manually.")
1002 def _RecursiveCheckIfLVMBased(disk):
1003 """Check if the given disk or its children are lvm-based.
1006 disk: ganeti.objects.Disk object
1009 boolean indicating whether a LD_LV dev_type was found or not
1013 for chdisk in disk.children:
1014 if _RecursiveCheckIfLVMBased(chdisk):
1016 return disk.dev_type == constants.LD_LV
1019 class LUSetClusterParams(LogicalUnit):
1020 """Change the parameters of the cluster.
1023 HPATH = "cluster-modify"
1024 HTYPE = constants.HTYPE_CLUSTER
1027 def BuildHooksEnv(self):
1032 "OP_TARGET": self.sstore.GetClusterName(),
1033 "NEW_VG_NAME": self.op.vg_name,
1035 mn = self.sstore.GetMasterNode()
1036 return env, [mn], [mn]
1038 def CheckPrereq(self):
1039 """Check prerequisites.
1041 This checks whether the given params don't conflict and
1042 if the given volume group is valid.
1045 if not self.op.vg_name:
1046 instances = [self.cfg.GetInstanceInfo(name)
1047 for name in self.cfg.GetInstanceList()]
1048 for inst in instances:
1049 for disk in inst.disks:
1050 if _RecursiveCheckIfLVMBased(disk):
1051 raise errors.OpPrereqError("Cannot disable lvm storage while"
1052 " lvm-based instances exist")
1054 # if vg_name not None, checks given volume group on all nodes
1056 node_list = self.cfg.GetNodeList()
1057 vglist = rpc.call_vg_list(node_list)
1058 for node in node_list:
1059 vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1061 raise errors.OpPrereqError("Error on node '%s': %s" %
1064 def Exec(self, feedback_fn):
1065 """Change the parameters of the cluster.
1068 if self.op.vg_name != self.cfg.GetVGName():
1069 self.cfg.SetVGName(self.op.vg_name)
1071 feedback_fn("Cluster LVM configuration already in desired"
1072 " state, not changing")
1075 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1076 """Sleep and poll for an instance's disk to sync.
1079 if not instance.disks:
1083 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1085 node = instance.primary_node
1087 for dev in instance.disks:
1088 cfgw.SetDiskID(dev, node)
1094 cumul_degraded = False
1095 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1097 proc.LogWarning("Can't get any data from node %s" % node)
1100 raise errors.RemoteError("Can't contact node %s for mirror data,"
1101 " aborting." % node)
1105 for i in range(len(rstats)):
1108 proc.LogWarning("Can't compute data for node %s/%s" %
1109 (node, instance.disks[i].iv_name))
1111 # we ignore the ldisk parameter
1112 perc_done, est_time, is_degraded, _ = mstat
1113 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1114 if perc_done is not None:
1116 if est_time is not None:
1117 rem_time = "%d estimated seconds remaining" % est_time
1120 rem_time = "no time estimate"
1121 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1122 (instance.disks[i].iv_name, perc_done, rem_time))
1129 time.sleep(min(60, max_time))
1135 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1136 return not cumul_degraded
1139 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1140 """Check that mirrors are not degraded.
1142 The ldisk parameter, if True, will change the test from the
1143 is_degraded attribute (which represents overall non-ok status for
1144 the device(s)) to the ldisk (representing the local storage status).
1147 cfgw.SetDiskID(dev, node)
1154 if on_primary or dev.AssembleOnSecondary():
1155 rstats = rpc.call_blockdev_find(node, dev)
1157 logger.ToStderr("Can't get any data from node %s" % node)
1160 result = result and (not rstats[idx])
1162 for child in dev.children:
1163 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1168 class LUDiagnoseOS(NoHooksLU):
1169 """Logical unit for OS diagnose/query.
1174 def CheckPrereq(self):
1175 """Check prerequisites.
1177 This always succeeds, since this is a pure query LU.
1182 def Exec(self, feedback_fn):
1183 """Compute the list of OSes.
1186 node_list = self.cfg.GetNodeList()
1187 node_data = rpc.call_os_diagnose(node_list)
1188 if node_data == False:
1189 raise errors.OpExecError("Can't gather the list of OSes")
1193 class LURemoveNode(LogicalUnit):
1194 """Logical unit for removing a node.
1197 HPATH = "node-remove"
1198 HTYPE = constants.HTYPE_NODE
1199 _OP_REQP = ["node_name"]
1201 def BuildHooksEnv(self):
1204 This doesn't run on the target node in the pre phase as a failed
1205 node would not allows itself to run.
1209 "OP_TARGET": self.op.node_name,
1210 "NODE_NAME": self.op.node_name,
1212 all_nodes = self.cfg.GetNodeList()
1213 all_nodes.remove(self.op.node_name)
1214 return env, all_nodes, all_nodes
1216 def CheckPrereq(self):
1217 """Check prerequisites.
1220 - the node exists in the configuration
1221 - it does not have primary or secondary instances
1222 - it's not the master
1224 Any errors are signalled by raising errors.OpPrereqError.
1227 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1229 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1231 instance_list = self.cfg.GetInstanceList()
1233 masternode = self.sstore.GetMasterNode()
1234 if node.name == masternode:
1235 raise errors.OpPrereqError("Node is the master node,"
1236 " you need to failover first.")
1238 for instance_name in instance_list:
1239 instance = self.cfg.GetInstanceInfo(instance_name)
1240 if node.name == instance.primary_node:
1241 raise errors.OpPrereqError("Instance %s still running on the node,"
1242 " please remove first." % instance_name)
1243 if node.name in instance.secondary_nodes:
1244 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1245 " please remove first." % instance_name)
1246 self.op.node_name = node.name
1249 def Exec(self, feedback_fn):
1250 """Removes the node from the cluster.
1254 logger.Info("stopping the node daemon and removing configs from node %s" %
1257 rpc.call_node_leave_cluster(node.name)
1259 self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1261 logger.Info("Removing node %s from config" % node.name)
1263 self.cfg.RemoveNode(node.name)
1265 _RemoveHostFromEtcHosts(node.name)
1268 class LUQueryNodes(NoHooksLU):
1269 """Logical unit for querying nodes.
1272 _OP_REQP = ["output_fields", "names"]
1274 def CheckPrereq(self):
1275 """Check prerequisites.
1277 This checks that the fields required are valid output fields.
1280 self.dynamic_fields = frozenset(["dtotal", "dfree",
1281 "mtotal", "mnode", "mfree",
1284 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1285 "pinst_list", "sinst_list",
1287 dynamic=self.dynamic_fields,
1288 selected=self.op.output_fields)
1290 self.wanted = _GetWantedNodes(self, self.op.names)
1292 def Exec(self, feedback_fn):
1293 """Computes the list of nodes and their attributes.
1296 nodenames = self.wanted
1297 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1299 # begin data gathering
1301 if self.dynamic_fields.intersection(self.op.output_fields):
1303 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1304 for name in nodenames:
1305 nodeinfo = node_data.get(name, None)
1308 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1309 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1310 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1311 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1312 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1313 "bootid": nodeinfo['bootid'],
1316 live_data[name] = {}
1318 live_data = dict.fromkeys(nodenames, {})
1320 node_to_primary = dict([(name, set()) for name in nodenames])
1321 node_to_secondary = dict([(name, set()) for name in nodenames])
1323 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1324 "sinst_cnt", "sinst_list"))
1325 if inst_fields & frozenset(self.op.output_fields):
1326 instancelist = self.cfg.GetInstanceList()
1328 for instance_name in instancelist:
1329 inst = self.cfg.GetInstanceInfo(instance_name)
1330 if inst.primary_node in node_to_primary:
1331 node_to_primary[inst.primary_node].add(inst.name)
1332 for secnode in inst.secondary_nodes:
1333 if secnode in node_to_secondary:
1334 node_to_secondary[secnode].add(inst.name)
1336 # end data gathering
1339 for node in nodelist:
1341 for field in self.op.output_fields:
1344 elif field == "pinst_list":
1345 val = list(node_to_primary[node.name])
1346 elif field == "sinst_list":
1347 val = list(node_to_secondary[node.name])
1348 elif field == "pinst_cnt":
1349 val = len(node_to_primary[node.name])
1350 elif field == "sinst_cnt":
1351 val = len(node_to_secondary[node.name])
1352 elif field == "pip":
1353 val = node.primary_ip
1354 elif field == "sip":
1355 val = node.secondary_ip
1356 elif field in self.dynamic_fields:
1357 val = live_data[node.name].get(field, None)
1359 raise errors.ParameterError(field)
1360 node_output.append(val)
1361 output.append(node_output)
1366 class LUQueryNodeVolumes(NoHooksLU):
1367 """Logical unit for getting volumes on node(s).
1370 _OP_REQP = ["nodes", "output_fields"]
1372 def CheckPrereq(self):
1373 """Check prerequisites.
1375 This checks that the fields required are valid output fields.
1378 self.nodes = _GetWantedNodes(self, self.op.nodes)
1380 _CheckOutputFields(static=["node"],
1381 dynamic=["phys", "vg", "name", "size", "instance"],
1382 selected=self.op.output_fields)
1385 def Exec(self, feedback_fn):
1386 """Computes the list of nodes and their attributes.
1389 nodenames = self.nodes
1390 volumes = rpc.call_node_volumes(nodenames)
1392 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1393 in self.cfg.GetInstanceList()]
1395 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1398 for node in nodenames:
1399 if node not in volumes or not volumes[node]:
1402 node_vols = volumes[node][:]
1403 node_vols.sort(key=lambda vol: vol['dev'])
1405 for vol in node_vols:
1407 for field in self.op.output_fields:
1410 elif field == "phys":
1414 elif field == "name":
1416 elif field == "size":
1417 val = int(float(vol['size']))
1418 elif field == "instance":
1420 if node not in lv_by_node[inst]:
1422 if vol['name'] in lv_by_node[inst][node]:
1428 raise errors.ParameterError(field)
1429 node_output.append(str(val))
1431 output.append(node_output)
1436 class LUAddNode(LogicalUnit):
1437 """Logical unit for adding node to the cluster.
1441 HTYPE = constants.HTYPE_NODE
1442 _OP_REQP = ["node_name"]
1444 def BuildHooksEnv(self):
1447 This will run on all nodes before, and on all nodes + the new node after.
1451 "OP_TARGET": self.op.node_name,
1452 "NODE_NAME": self.op.node_name,
1453 "NODE_PIP": self.op.primary_ip,
1454 "NODE_SIP": self.op.secondary_ip,
1456 nodes_0 = self.cfg.GetNodeList()
1457 nodes_1 = nodes_0 + [self.op.node_name, ]
1458 return env, nodes_0, nodes_1
1460 def CheckPrereq(self):
1461 """Check prerequisites.
1464 - the new node is not already in the config
1466 - its parameters (single/dual homed) matches the cluster
1468 Any errors are signalled by raising errors.OpPrereqError.
1471 node_name = self.op.node_name
1474 dns_data = utils.HostInfo(node_name)
1476 node = dns_data.name
1477 primary_ip = self.op.primary_ip = dns_data.ip
1478 secondary_ip = getattr(self.op, "secondary_ip", None)
1479 if secondary_ip is None:
1480 secondary_ip = primary_ip
1481 if not utils.IsValidIP(secondary_ip):
1482 raise errors.OpPrereqError("Invalid secondary IP given")
1483 self.op.secondary_ip = secondary_ip
1484 node_list = cfg.GetNodeList()
1485 if node in node_list:
1486 raise errors.OpPrereqError("Node %s is already in the configuration"
1489 for existing_node_name in node_list:
1490 existing_node = cfg.GetNodeInfo(existing_node_name)
1491 if (existing_node.primary_ip == primary_ip or
1492 existing_node.secondary_ip == primary_ip or
1493 existing_node.primary_ip == secondary_ip or
1494 existing_node.secondary_ip == secondary_ip):
1495 raise errors.OpPrereqError("New node ip address(es) conflict with"
1496 " existing node %s" % existing_node.name)
1498 # check that the type of the node (single versus dual homed) is the
1499 # same as for the master
1500 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1501 master_singlehomed = myself.secondary_ip == myself.primary_ip
1502 newbie_singlehomed = secondary_ip == primary_ip
1503 if master_singlehomed != newbie_singlehomed:
1504 if master_singlehomed:
1505 raise errors.OpPrereqError("The master has no private ip but the"
1506 " new node has one")
1508 raise errors.OpPrereqError("The master has a private ip but the"
1509 " new node doesn't have one")
1511 # checks reachablity
1512 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1513 raise errors.OpPrereqError("Node not reachable by ping")
1515 if not newbie_singlehomed:
1516 # check reachability from my secondary ip to newbie's secondary ip
1517 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1518 source=myself.secondary_ip):
1519 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1520 " based ping to noded port")
1522 self.new_node = objects.Node(name=node,
1523 primary_ip=primary_ip,
1524 secondary_ip=secondary_ip)
1526 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1527 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1528 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1529 constants.VNC_PASSWORD_FILE)
1531 def Exec(self, feedback_fn):
1532 """Adds the new node to the cluster.
1535 new_node = self.new_node
1536 node = new_node.name
1538 # set up inter-node password and certificate and restarts the node daemon
1539 gntpass = self.sstore.GetNodeDaemonPassword()
1540 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1541 raise errors.OpExecError("ganeti password corruption detected")
1542 f = open(constants.SSL_CERT_FILE)
1544 gntpem = f.read(8192)
1547 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1548 # so we use this to detect an invalid certificate; as long as the
1549 # cert doesn't contain this, the here-document will be correctly
1550 # parsed by the shell sequence below
1551 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1552 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1553 if not gntpem.endswith("\n"):
1554 raise errors.OpExecError("PEM must end with newline")
1555 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1557 # and then connect with ssh to set password and start ganeti-noded
1558 # note that all the below variables are sanitized at this point,
1559 # either by being constants or by the checks above
1561 mycommand = ("umask 077 && "
1562 "echo '%s' > '%s' && "
1563 "cat > '%s' << '!EOF.' && \n"
1564 "%s!EOF.\n%s restart" %
1565 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1566 constants.SSL_CERT_FILE, gntpem,
1567 constants.NODE_INITD_SCRIPT))
1569 result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1571 raise errors.OpExecError("Remote command on node %s, error: %s,"
1573 (node, result.fail_reason, result.output))
1575 # check connectivity
1578 result = rpc.call_version([node])[node]
1580 if constants.PROTOCOL_VERSION == result:
1581 logger.Info("communication to node %s fine, sw version %s match" %
1584 raise errors.OpExecError("Version mismatch master version %s,"
1585 " node version %s" %
1586 (constants.PROTOCOL_VERSION, result))
1588 raise errors.OpExecError("Cannot get version from the new node")
1591 logger.Info("copy ssh key to node %s" % node)
1592 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1594 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1595 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1601 keyarray.append(f.read())
1605 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1606 keyarray[3], keyarray[4], keyarray[5])
1609 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1611 # Add node to our /etc/hosts, and add key to known_hosts
1612 _AddHostToEtcHosts(new_node.name)
1614 if new_node.secondary_ip != new_node.primary_ip:
1615 if not rpc.call_node_tcp_ping(new_node.name,
1616 constants.LOCALHOST_IP_ADDRESS,
1617 new_node.secondary_ip,
1618 constants.DEFAULT_NODED_PORT,
1620 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1621 " you gave (%s). Please fix and re-run this"
1622 " command." % new_node.secondary_ip)
1624 success, msg = self.ssh.VerifyNodeHostname(node)
1626 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1627 " than the one the resolver gives: %s."
1628 " Please fix and re-run this command." %
1631 # Distribute updated /etc/hosts and known_hosts to all nodes,
1632 # including the node just added
1633 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1634 dist_nodes = self.cfg.GetNodeList() + [node]
1635 if myself.name in dist_nodes:
1636 dist_nodes.remove(myself.name)
1638 logger.Debug("Copying hosts and known_hosts to all nodes")
1639 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1640 result = rpc.call_upload_file(dist_nodes, fname)
1641 for to_node in dist_nodes:
1642 if not result[to_node]:
1643 logger.Error("copy of file %s to node %s failed" %
1646 to_copy = ss.GetFileList()
1647 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1648 to_copy.append(constants.VNC_PASSWORD_FILE)
1649 for fname in to_copy:
1650 if not self.ssh.CopyFileToNode(node, fname):
1651 logger.Error("could not copy file %s to node %s" % (fname, node))
1653 logger.Info("adding node %s to cluster.conf" % node)
1654 self.cfg.AddNode(new_node)
1657 class LUMasterFailover(LogicalUnit):
1658 """Failover the master node to the current node.
1660 This is a special LU in that it must run on a non-master node.
1663 HPATH = "master-failover"
1664 HTYPE = constants.HTYPE_CLUSTER
1668 def BuildHooksEnv(self):
1671 This will run on the new master only in the pre phase, and on all
1672 the nodes in the post phase.
1676 "OP_TARGET": self.new_master,
1677 "NEW_MASTER": self.new_master,
1678 "OLD_MASTER": self.old_master,
1680 return env, [self.new_master], self.cfg.GetNodeList()
1682 def CheckPrereq(self):
1683 """Check prerequisites.
1685 This checks that we are not already the master.
1688 self.new_master = utils.HostInfo().name
1689 self.old_master = self.sstore.GetMasterNode()
1691 if self.old_master == self.new_master:
1692 raise errors.OpPrereqError("This commands must be run on the node"
1693 " where you want the new master to be."
1694 " %s is already the master" %
1697 def Exec(self, feedback_fn):
1698 """Failover the master node.
1700 This command, when run on a non-master node, will cause the current
1701 master to cease being master, and the non-master to become new
1705 #TODO: do not rely on gethostname returning the FQDN
1706 logger.Info("setting master to %s, old master: %s" %
1707 (self.new_master, self.old_master))
1709 if not rpc.call_node_stop_master(self.old_master):
1710 logger.Error("could disable the master role on the old master"
1711 " %s, please disable manually" % self.old_master)
1714 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1715 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1716 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1717 logger.Error("could not distribute the new simple store master file"
1718 " to the other nodes, please check.")
1720 if not rpc.call_node_start_master(self.new_master):
1721 logger.Error("could not start the master role on the new master"
1722 " %s, please check" % self.new_master)
1723 feedback_fn("Error in activating the master IP on the new master,"
1724 " please fix manually.")
1728 class LUQueryClusterInfo(NoHooksLU):
1729 """Query cluster configuration.
1735 def CheckPrereq(self):
1736 """No prerequsites needed for this LU.
1741 def Exec(self, feedback_fn):
1742 """Return cluster config.
1746 "name": self.sstore.GetClusterName(),
1747 "software_version": constants.RELEASE_VERSION,
1748 "protocol_version": constants.PROTOCOL_VERSION,
1749 "config_version": constants.CONFIG_VERSION,
1750 "os_api_version": constants.OS_API_VERSION,
1751 "export_version": constants.EXPORT_VERSION,
1752 "master": self.sstore.GetMasterNode(),
1753 "architecture": (platform.architecture()[0], platform.machine()),
1759 class LUClusterCopyFile(NoHooksLU):
1760 """Copy file to cluster.
1763 _OP_REQP = ["nodes", "filename"]
1765 def CheckPrereq(self):
1766 """Check prerequisites.
1768 It should check that the named file exists and that the given list
1772 if not os.path.exists(self.op.filename):
1773 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1775 self.nodes = _GetWantedNodes(self, self.op.nodes)
1777 def Exec(self, feedback_fn):
1778 """Copy a file from master to some nodes.
1781 opts - class with options as members
1782 args - list containing a single element, the file name
1784 nodes - list containing the name of target nodes; if empty, all nodes
1787 filename = self.op.filename
1789 myname = utils.HostInfo().name
1791 for node in self.nodes:
1794 if not self.ssh.CopyFileToNode(node, filename):
1795 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1798 class LUDumpClusterConfig(NoHooksLU):
1799 """Return a text-representation of the cluster-config.
1804 def CheckPrereq(self):
1805 """No prerequisites.
1810 def Exec(self, feedback_fn):
1811 """Dump a representation of the cluster config to the standard output.
1814 return self.cfg.DumpConfig()
1817 class LURunClusterCommand(NoHooksLU):
1818 """Run a command on some nodes.
1821 _OP_REQP = ["command", "nodes"]
1823 def CheckPrereq(self):
1824 """Check prerequisites.
1826 It checks that the given list of nodes is valid.
1829 self.nodes = _GetWantedNodes(self, self.op.nodes)
1831 def Exec(self, feedback_fn):
1832 """Run a command on some nodes.
1835 # put the master at the end of the nodes list
1836 master_node = self.sstore.GetMasterNode()
1837 if master_node in self.nodes:
1838 self.nodes.remove(master_node)
1839 self.nodes.append(master_node)
1842 for node in self.nodes:
1843 result = self.ssh.Run(node, "root", self.op.command)
1844 data.append((node, result.output, result.exit_code))
1849 class LUActivateInstanceDisks(NoHooksLU):
1850 """Bring up an instance's disks.
1853 _OP_REQP = ["instance_name"]
1855 def CheckPrereq(self):
1856 """Check prerequisites.
1858 This checks that the instance is in the cluster.
1861 instance = self.cfg.GetInstanceInfo(
1862 self.cfg.ExpandInstanceName(self.op.instance_name))
1863 if instance is None:
1864 raise errors.OpPrereqError("Instance '%s' not known" %
1865 self.op.instance_name)
1866 self.instance = instance
1869 def Exec(self, feedback_fn):
1870 """Activate the disks.
1873 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1875 raise errors.OpExecError("Cannot activate block devices")
1880 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1881 """Prepare the block devices for an instance.
1883 This sets up the block devices on all nodes.
1886 instance: a ganeti.objects.Instance object
1887 ignore_secondaries: if true, errors on secondary nodes won't result
1888 in an error return from the function
1891 false if the operation failed
1892 list of (host, instance_visible_name, node_visible_name) if the operation
1893 suceeded with the mapping from node devices to instance devices
1897 iname = instance.name
1898 # With the two passes mechanism we try to reduce the window of
1899 # opportunity for the race condition of switching DRBD to primary
1900 # before handshaking occured, but we do not eliminate it
1902 # The proper fix would be to wait (with some limits) until the
1903 # connection has been made and drbd transitions from WFConnection
1904 # into any other network-connected state (Connected, SyncTarget,
1907 # 1st pass, assemble on all nodes in secondary mode
1908 for inst_disk in instance.disks:
1909 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1910 cfg.SetDiskID(node_disk, node)
1911 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1913 logger.Error("could not prepare block device %s on node %s"
1914 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1915 if not ignore_secondaries:
1918 # FIXME: race condition on drbd migration to primary
1920 # 2nd pass, do only the primary node
1921 for inst_disk in instance.disks:
1922 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1923 if node != instance.primary_node:
1925 cfg.SetDiskID(node_disk, node)
1926 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1928 logger.Error("could not prepare block device %s on node %s"
1929 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1931 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1933 # leave the disks configured for the primary node
1934 # this is a workaround that would be fixed better by
1935 # improving the logical/physical id handling
1936 for disk in instance.disks:
1937 cfg.SetDiskID(disk, instance.primary_node)
1939 return disks_ok, device_info
1942 def _StartInstanceDisks(cfg, instance, force):
1943 """Start the disks of an instance.
1946 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1947 ignore_secondaries=force)
1949 _ShutdownInstanceDisks(instance, cfg)
1950 if force is not None and not force:
1951 logger.Error("If the message above refers to a secondary node,"
1952 " you can retry the operation using '--force'.")
1953 raise errors.OpExecError("Disk consistency error")
1956 class LUDeactivateInstanceDisks(NoHooksLU):
1957 """Shutdown an instance's disks.
1960 _OP_REQP = ["instance_name"]
1962 def CheckPrereq(self):
1963 """Check prerequisites.
1965 This checks that the instance is in the cluster.
1968 instance = self.cfg.GetInstanceInfo(
1969 self.cfg.ExpandInstanceName(self.op.instance_name))
1970 if instance is None:
1971 raise errors.OpPrereqError("Instance '%s' not known" %
1972 self.op.instance_name)
1973 self.instance = instance
1975 def Exec(self, feedback_fn):
1976 """Deactivate the disks
1979 instance = self.instance
1980 ins_l = rpc.call_instance_list([instance.primary_node])
1981 ins_l = ins_l[instance.primary_node]
1982 if not type(ins_l) is list:
1983 raise errors.OpExecError("Can't contact node '%s'" %
1984 instance.primary_node)
1986 if self.instance.name in ins_l:
1987 raise errors.OpExecError("Instance is running, can't shutdown"
1990 _ShutdownInstanceDisks(instance, self.cfg)
1993 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1994 """Shutdown block devices of an instance.
1996 This does the shutdown on all nodes of the instance.
1998 If the ignore_primary is false, errors on the primary node are
2003 for disk in instance.disks:
2004 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2005 cfg.SetDiskID(top_disk, node)
2006 if not rpc.call_blockdev_shutdown(node, top_disk):
2007 logger.Error("could not shutdown block device %s on node %s" %
2008 (disk.iv_name, node))
2009 if not ignore_primary or node != instance.primary_node:
2014 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2015 """Checks if a node has enough free memory.
2017 This function check if a given node has the needed amount of free
2018 memory. In case the node has less memory or we cannot get the
2019 information from the node, this function raise an OpPrereqError
2023 - cfg: a ConfigWriter instance
2024 - node: the node name
2025 - reason: string to use in the error message
2026 - requested: the amount of memory in MiB
2029 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2030 if not nodeinfo or not isinstance(nodeinfo, dict):
2031 raise errors.OpPrereqError("Could not contact node %s for resource"
2032 " information" % (node,))
2034 free_mem = nodeinfo[node].get('memory_free')
2035 if not isinstance(free_mem, int):
2036 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2037 " was '%s'" % (node, free_mem))
2038 if requested > free_mem:
2039 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2040 " needed %s MiB, available %s MiB" %
2041 (node, reason, requested, free_mem))
2044 class LUStartupInstance(LogicalUnit):
2045 """Starts an instance.
2048 HPATH = "instance-start"
2049 HTYPE = constants.HTYPE_INSTANCE
2050 _OP_REQP = ["instance_name", "force"]
2052 def BuildHooksEnv(self):
2055 This runs on master, primary and secondary nodes of the instance.
2059 "FORCE": self.op.force,
2061 env.update(_BuildInstanceHookEnvByObject(self.instance))
2062 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2063 list(self.instance.secondary_nodes))
2066 def CheckPrereq(self):
2067 """Check prerequisites.
2069 This checks that the instance is in the cluster.
2072 instance = self.cfg.GetInstanceInfo(
2073 self.cfg.ExpandInstanceName(self.op.instance_name))
2074 if instance is None:
2075 raise errors.OpPrereqError("Instance '%s' not known" %
2076 self.op.instance_name)
2078 # check bridges existance
2079 _CheckInstanceBridgesExist(instance)
2081 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2082 "starting instance %s" % instance.name,
2085 self.instance = instance
2086 self.op.instance_name = instance.name
2088 def Exec(self, feedback_fn):
2089 """Start the instance.
2092 instance = self.instance
2093 force = self.op.force
2094 extra_args = getattr(self.op, "extra_args", "")
2096 self.cfg.MarkInstanceUp(instance.name)
2098 node_current = instance.primary_node
2100 _StartInstanceDisks(self.cfg, instance, force)
2102 if not rpc.call_instance_start(node_current, instance, extra_args):
2103 _ShutdownInstanceDisks(instance, self.cfg)
2104 raise errors.OpExecError("Could not start instance")
2107 class LURebootInstance(LogicalUnit):
2108 """Reboot an instance.
2111 HPATH = "instance-reboot"
2112 HTYPE = constants.HTYPE_INSTANCE
2113 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2115 def BuildHooksEnv(self):
2118 This runs on master, primary and secondary nodes of the instance.
2122 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2124 env.update(_BuildInstanceHookEnvByObject(self.instance))
2125 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126 list(self.instance.secondary_nodes))
2129 def CheckPrereq(self):
2130 """Check prerequisites.
2132 This checks that the instance is in the cluster.
2135 instance = self.cfg.GetInstanceInfo(
2136 self.cfg.ExpandInstanceName(self.op.instance_name))
2137 if instance is None:
2138 raise errors.OpPrereqError("Instance '%s' not known" %
2139 self.op.instance_name)
2141 # check bridges existance
2142 _CheckInstanceBridgesExist(instance)
2144 self.instance = instance
2145 self.op.instance_name = instance.name
2147 def Exec(self, feedback_fn):
2148 """Reboot the instance.
2151 instance = self.instance
2152 ignore_secondaries = self.op.ignore_secondaries
2153 reboot_type = self.op.reboot_type
2154 extra_args = getattr(self.op, "extra_args", "")
2156 node_current = instance.primary_node
2158 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2159 constants.INSTANCE_REBOOT_HARD,
2160 constants.INSTANCE_REBOOT_FULL]:
2161 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2162 (constants.INSTANCE_REBOOT_SOFT,
2163 constants.INSTANCE_REBOOT_HARD,
2164 constants.INSTANCE_REBOOT_FULL))
2166 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2167 constants.INSTANCE_REBOOT_HARD]:
2168 if not rpc.call_instance_reboot(node_current, instance,
2169 reboot_type, extra_args):
2170 raise errors.OpExecError("Could not reboot instance")
2172 if not rpc.call_instance_shutdown(node_current, instance):
2173 raise errors.OpExecError("could not shutdown instance for full reboot")
2174 _ShutdownInstanceDisks(instance, self.cfg)
2175 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2176 if not rpc.call_instance_start(node_current, instance, extra_args):
2177 _ShutdownInstanceDisks(instance, self.cfg)
2178 raise errors.OpExecError("Could not start instance for full reboot")
2180 self.cfg.MarkInstanceUp(instance.name)
2183 class LUShutdownInstance(LogicalUnit):
2184 """Shutdown an instance.
2187 HPATH = "instance-stop"
2188 HTYPE = constants.HTYPE_INSTANCE
2189 _OP_REQP = ["instance_name"]
2191 def BuildHooksEnv(self):
2194 This runs on master, primary and secondary nodes of the instance.
2197 env = _BuildInstanceHookEnvByObject(self.instance)
2198 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2199 list(self.instance.secondary_nodes))
2202 def CheckPrereq(self):
2203 """Check prerequisites.
2205 This checks that the instance is in the cluster.
2208 instance = self.cfg.GetInstanceInfo(
2209 self.cfg.ExpandInstanceName(self.op.instance_name))
2210 if instance is None:
2211 raise errors.OpPrereqError("Instance '%s' not known" %
2212 self.op.instance_name)
2213 self.instance = instance
2215 def Exec(self, feedback_fn):
2216 """Shutdown the instance.
2219 instance = self.instance
2220 node_current = instance.primary_node
2221 self.cfg.MarkInstanceDown(instance.name)
2222 if not rpc.call_instance_shutdown(node_current, instance):
2223 logger.Error("could not shutdown instance")
2225 _ShutdownInstanceDisks(instance, self.cfg)
2228 class LUReinstallInstance(LogicalUnit):
2229 """Reinstall an instance.
2232 HPATH = "instance-reinstall"
2233 HTYPE = constants.HTYPE_INSTANCE
2234 _OP_REQP = ["instance_name"]
2236 def BuildHooksEnv(self):
2239 This runs on master, primary and secondary nodes of the instance.
2242 env = _BuildInstanceHookEnvByObject(self.instance)
2243 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2244 list(self.instance.secondary_nodes))
2247 def CheckPrereq(self):
2248 """Check prerequisites.
2250 This checks that the instance is in the cluster and is not running.
2253 instance = self.cfg.GetInstanceInfo(
2254 self.cfg.ExpandInstanceName(self.op.instance_name))
2255 if instance is None:
2256 raise errors.OpPrereqError("Instance '%s' not known" %
2257 self.op.instance_name)
2258 if instance.disk_template == constants.DT_DISKLESS:
2259 raise errors.OpPrereqError("Instance '%s' has no disks" %
2260 self.op.instance_name)
2261 if instance.status != "down":
2262 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2263 self.op.instance_name)
2264 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2266 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2267 (self.op.instance_name,
2268 instance.primary_node))
2270 self.op.os_type = getattr(self.op, "os_type", None)
2271 if self.op.os_type is not None:
2273 pnode = self.cfg.GetNodeInfo(
2274 self.cfg.ExpandNodeName(instance.primary_node))
2276 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2278 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2280 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2281 " primary node" % self.op.os_type)
2283 self.instance = instance
2285 def Exec(self, feedback_fn):
2286 """Reinstall the instance.
2289 inst = self.instance
2291 if self.op.os_type is not None:
2292 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2293 inst.os = self.op.os_type
2294 self.cfg.AddInstance(inst)
2296 _StartInstanceDisks(self.cfg, inst, None)
2298 feedback_fn("Running the instance OS create scripts...")
2299 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2300 raise errors.OpExecError("Could not install OS for instance %s"
2302 (inst.name, inst.primary_node))
2304 _ShutdownInstanceDisks(inst, self.cfg)
2307 class LURenameInstance(LogicalUnit):
2308 """Rename an instance.
2311 HPATH = "instance-rename"
2312 HTYPE = constants.HTYPE_INSTANCE
2313 _OP_REQP = ["instance_name", "new_name"]
2315 def BuildHooksEnv(self):
2318 This runs on master, primary and secondary nodes of the instance.
2321 env = _BuildInstanceHookEnvByObject(self.instance)
2322 env["INSTANCE_NEW_NAME"] = self.op.new_name
2323 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2324 list(self.instance.secondary_nodes))
2327 def CheckPrereq(self):
2328 """Check prerequisites.
2330 This checks that the instance is in the cluster and is not running.
2333 instance = self.cfg.GetInstanceInfo(
2334 self.cfg.ExpandInstanceName(self.op.instance_name))
2335 if instance is None:
2336 raise errors.OpPrereqError("Instance '%s' not known" %
2337 self.op.instance_name)
2338 if instance.status != "down":
2339 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2340 self.op.instance_name)
2341 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2343 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2344 (self.op.instance_name,
2345 instance.primary_node))
2346 self.instance = instance
2348 # new name verification
2349 name_info = utils.HostInfo(self.op.new_name)
2351 self.op.new_name = new_name = name_info.name
2352 instance_list = self.cfg.GetInstanceList()
2353 if new_name in instance_list:
2354 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2357 if not getattr(self.op, "ignore_ip", False):
2358 command = ["fping", "-q", name_info.ip]
2359 result = utils.RunCmd(command)
2360 if not result.failed:
2361 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2362 (name_info.ip, new_name))
2365 def Exec(self, feedback_fn):
2366 """Reinstall the instance.
2369 inst = self.instance
2370 old_name = inst.name
2372 self.cfg.RenameInstance(inst.name, self.op.new_name)
2374 # re-read the instance from the configuration after rename
2375 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2377 _StartInstanceDisks(self.cfg, inst, None)
2379 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2381 msg = ("Could run OS rename script for instance %s on node %s (but the"
2382 " instance has been renamed in Ganeti)" %
2383 (inst.name, inst.primary_node))
2386 _ShutdownInstanceDisks(inst, self.cfg)
2389 class LURemoveInstance(LogicalUnit):
2390 """Remove an instance.
2393 HPATH = "instance-remove"
2394 HTYPE = constants.HTYPE_INSTANCE
2395 _OP_REQP = ["instance_name"]
2397 def BuildHooksEnv(self):
2400 This runs on master, primary and secondary nodes of the instance.
2403 env = _BuildInstanceHookEnvByObject(self.instance)
2404 nl = [self.sstore.GetMasterNode()]
2407 def CheckPrereq(self):
2408 """Check prerequisites.
2410 This checks that the instance is in the cluster.
2413 instance = self.cfg.GetInstanceInfo(
2414 self.cfg.ExpandInstanceName(self.op.instance_name))
2415 if instance is None:
2416 raise errors.OpPrereqError("Instance '%s' not known" %
2417 self.op.instance_name)
2418 self.instance = instance
2420 def Exec(self, feedback_fn):
2421 """Remove the instance.
2424 instance = self.instance
2425 logger.Info("shutting down instance %s on node %s" %
2426 (instance.name, instance.primary_node))
2428 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2429 if self.op.ignore_failures:
2430 feedback_fn("Warning: can't shutdown instance")
2432 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2433 (instance.name, instance.primary_node))
2435 logger.Info("removing block devices for instance %s" % instance.name)
2437 if not _RemoveDisks(instance, self.cfg):
2438 if self.op.ignore_failures:
2439 feedback_fn("Warning: can't remove instance's disks")
2441 raise errors.OpExecError("Can't remove instance's disks")
2443 logger.Info("removing instance %s out of cluster config" % instance.name)
2445 self.cfg.RemoveInstance(instance.name)
2448 class LUQueryInstances(NoHooksLU):
2449 """Logical unit for querying instances.
2452 _OP_REQP = ["output_fields", "names"]
2454 def CheckPrereq(self):
2455 """Check prerequisites.
2457 This checks that the fields required are valid output fields.
2460 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2461 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2462 "admin_state", "admin_ram",
2463 "disk_template", "ip", "mac", "bridge",
2464 "sda_size", "sdb_size", "vcpus"],
2465 dynamic=self.dynamic_fields,
2466 selected=self.op.output_fields)
2468 self.wanted = _GetWantedInstances(self, self.op.names)
2470 def Exec(self, feedback_fn):
2471 """Computes the list of nodes and their attributes.
2474 instance_names = self.wanted
2475 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2478 # begin data gathering
2480 nodes = frozenset([inst.primary_node for inst in instance_list])
2483 if self.dynamic_fields.intersection(self.op.output_fields):
2485 node_data = rpc.call_all_instances_info(nodes)
2487 result = node_data[name]
2489 live_data.update(result)
2490 elif result == False:
2491 bad_nodes.append(name)
2492 # else no instance is alive
2494 live_data = dict([(name, {}) for name in instance_names])
2496 # end data gathering
2499 for instance in instance_list:
2501 for field in self.op.output_fields:
2506 elif field == "pnode":
2507 val = instance.primary_node
2508 elif field == "snodes":
2509 val = list(instance.secondary_nodes)
2510 elif field == "admin_state":
2511 val = (instance.status != "down")
2512 elif field == "oper_state":
2513 if instance.primary_node in bad_nodes:
2516 val = bool(live_data.get(instance.name))
2517 elif field == "status":
2518 if instance.primary_node in bad_nodes:
2519 val = "ERROR_nodedown"
2521 running = bool(live_data.get(instance.name))
2523 if instance.status != "down":
2528 if instance.status != "down":
2532 elif field == "admin_ram":
2533 val = instance.memory
2534 elif field == "oper_ram":
2535 if instance.primary_node in bad_nodes:
2537 elif instance.name in live_data:
2538 val = live_data[instance.name].get("memory", "?")
2541 elif field == "disk_template":
2542 val = instance.disk_template
2544 val = instance.nics[0].ip
2545 elif field == "bridge":
2546 val = instance.nics[0].bridge
2547 elif field == "mac":
2548 val = instance.nics[0].mac
2549 elif field == "sda_size" or field == "sdb_size":
2550 disk = instance.FindDisk(field[:3])
2555 elif field == "vcpus":
2556 val = instance.vcpus
2558 raise errors.ParameterError(field)
2565 class LUFailoverInstance(LogicalUnit):
2566 """Failover an instance.
2569 HPATH = "instance-failover"
2570 HTYPE = constants.HTYPE_INSTANCE
2571 _OP_REQP = ["instance_name", "ignore_consistency"]
2573 def BuildHooksEnv(self):
2576 This runs on master, primary and secondary nodes of the instance.
2580 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2582 env.update(_BuildInstanceHookEnvByObject(self.instance))
2583 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2586 def CheckPrereq(self):
2587 """Check prerequisites.
2589 This checks that the instance is in the cluster.
2592 instance = self.cfg.GetInstanceInfo(
2593 self.cfg.ExpandInstanceName(self.op.instance_name))
2594 if instance is None:
2595 raise errors.OpPrereqError("Instance '%s' not known" %
2596 self.op.instance_name)
2598 if instance.disk_template not in constants.DTS_NET_MIRROR:
2599 raise errors.OpPrereqError("Instance's disk layout is not"
2600 " network mirrored, cannot failover.")
2602 secondary_nodes = instance.secondary_nodes
2603 if not secondary_nodes:
2604 raise errors.ProgrammerError("no secondary node but using "
2605 "DT_REMOTE_RAID1 template")
2607 target_node = secondary_nodes[0]
2608 # check memory requirements on the secondary node
2609 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2610 instance.name, instance.memory)
2612 # check bridge existance
2613 brlist = [nic.bridge for nic in instance.nics]
2614 if not rpc.call_bridges_exist(target_node, brlist):
2615 raise errors.OpPrereqError("One or more target bridges %s does not"
2616 " exist on destination node '%s'" %
2617 (brlist, target_node))
2619 self.instance = instance
2621 def Exec(self, feedback_fn):
2622 """Failover an instance.
2624 The failover is done by shutting it down on its present node and
2625 starting it on the secondary.
2628 instance = self.instance
2630 source_node = instance.primary_node
2631 target_node = instance.secondary_nodes[0]
2633 feedback_fn("* checking disk consistency between source and target")
2634 for dev in instance.disks:
2635 # for remote_raid1, these are md over drbd
2636 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2637 if instance.status == "up" and not self.op.ignore_consistency:
2638 raise errors.OpExecError("Disk %s is degraded on target node,"
2639 " aborting failover." % dev.iv_name)
2641 feedback_fn("* shutting down instance on source node")
2642 logger.Info("Shutting down instance %s on node %s" %
2643 (instance.name, source_node))
2645 if not rpc.call_instance_shutdown(source_node, instance):
2646 if self.op.ignore_consistency:
2647 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2648 " anyway. Please make sure node %s is down" %
2649 (instance.name, source_node, source_node))
2651 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2652 (instance.name, source_node))
2654 feedback_fn("* deactivating the instance's disks on source node")
2655 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2656 raise errors.OpExecError("Can't shut down the instance's disks.")
2658 instance.primary_node = target_node
2659 # distribute new instance config to the other nodes
2660 self.cfg.AddInstance(instance)
2662 # Only start the instance if it's marked as up
2663 if instance.status == "up":
2664 feedback_fn("* activating the instance's disks on target node")
2665 logger.Info("Starting instance %s on node %s" %
2666 (instance.name, target_node))
2668 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2669 ignore_secondaries=True)
2671 _ShutdownInstanceDisks(instance, self.cfg)
2672 raise errors.OpExecError("Can't activate the instance's disks")
2674 feedback_fn("* starting the instance on the target node")
2675 if not rpc.call_instance_start(target_node, instance, None):
2676 _ShutdownInstanceDisks(instance, self.cfg)
2677 raise errors.OpExecError("Could not start instance %s on node %s." %
2678 (instance.name, target_node))
2681 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2682 """Create a tree of block devices on the primary node.
2684 This always creates all devices.
2688 for child in device.children:
2689 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2692 cfg.SetDiskID(device, node)
2693 new_id = rpc.call_blockdev_create(node, device, device.size,
2694 instance.name, True, info)
2697 if device.physical_id is None:
2698 device.physical_id = new_id
2702 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2703 """Create a tree of block devices on a secondary node.
2705 If this device type has to be created on secondaries, create it and
2708 If not, just recurse to children keeping the same 'force' value.
2711 if device.CreateOnSecondary():
2714 for child in device.children:
2715 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2716 child, force, info):
2721 cfg.SetDiskID(device, node)
2722 new_id = rpc.call_blockdev_create(node, device, device.size,
2723 instance.name, False, info)
2726 if device.physical_id is None:
2727 device.physical_id = new_id
2731 def _GenerateUniqueNames(cfg, exts):
2732 """Generate a suitable LV name.
2734 This will generate a logical volume name for the given instance.
2739 new_id = cfg.GenerateUniqueID()
2740 results.append("%s%s" % (new_id, val))
2744 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2745 """Generate a drbd device complete with its children.
2748 port = cfg.AllocatePort()
2749 vgname = cfg.GetVGName()
2750 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2751 logical_id=(vgname, names[0]))
2752 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2753 logical_id=(vgname, names[1]))
2754 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2755 logical_id = (primary, secondary, port),
2756 children = [dev_data, dev_meta])
2760 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2761 """Generate a drbd8 device complete with its children.
2764 port = cfg.AllocatePort()
2765 vgname = cfg.GetVGName()
2766 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2767 logical_id=(vgname, names[0]))
2768 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2769 logical_id=(vgname, names[1]))
2770 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2771 logical_id = (primary, secondary, port),
2772 children = [dev_data, dev_meta],
2777 def _GenerateDiskTemplate(cfg, template_name,
2778 instance_name, primary_node,
2779 secondary_nodes, disk_sz, swap_sz):
2780 """Generate the entire disk layout for a given template type.
2783 #TODO: compute space requirements
2785 vgname = cfg.GetVGName()
2786 if template_name == constants.DT_DISKLESS:
2788 elif template_name == constants.DT_PLAIN:
2789 if len(secondary_nodes) != 0:
2790 raise errors.ProgrammerError("Wrong template configuration")
2792 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2793 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2794 logical_id=(vgname, names[0]),
2796 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2797 logical_id=(vgname, names[1]),
2799 disks = [sda_dev, sdb_dev]
2800 elif template_name == constants.DT_DRBD8:
2801 if len(secondary_nodes) != 1:
2802 raise errors.ProgrammerError("Wrong template configuration")
2803 remote_node = secondary_nodes[0]
2804 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2805 ".sdb_data", ".sdb_meta"])
2806 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2807 disk_sz, names[0:2], "sda")
2808 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2809 swap_sz, names[2:4], "sdb")
2810 disks = [drbd_sda_dev, drbd_sdb_dev]
2812 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2816 def _GetInstanceInfoText(instance):
2817 """Compute that text that should be added to the disk's metadata.
2820 return "originstname+%s" % instance.name
2823 def _CreateDisks(cfg, instance):
2824 """Create all disks for an instance.
2826 This abstracts away some work from AddInstance.
2829 instance: the instance object
2832 True or False showing the success of the creation process
2835 info = _GetInstanceInfoText(instance)
2837 for device in instance.disks:
2838 logger.Info("creating volume %s for instance %s" %
2839 (device.iv_name, instance.name))
2841 for secondary_node in instance.secondary_nodes:
2842 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2843 device, False, info):
2844 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2845 (device.iv_name, device, secondary_node))
2848 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2849 instance, device, info):
2850 logger.Error("failed to create volume %s on primary!" %
2856 def _RemoveDisks(instance, cfg):
2857 """Remove all disks for an instance.
2859 This abstracts away some work from `AddInstance()` and
2860 `RemoveInstance()`. Note that in case some of the devices couldn't
2861 be removed, the removal will continue with the other ones (compare
2862 with `_CreateDisks()`).
2865 instance: the instance object
2868 True or False showing the success of the removal proces
2871 logger.Info("removing block devices for instance %s" % instance.name)
2874 for device in instance.disks:
2875 for node, disk in device.ComputeNodeTree(instance.primary_node):
2876 cfg.SetDiskID(disk, node)
2877 if not rpc.call_blockdev_remove(node, disk):
2878 logger.Error("could not remove block device %s on node %s,"
2879 " continuing anyway" %
2880 (device.iv_name, node))
2885 class LUCreateInstance(LogicalUnit):
2886 """Create an instance.
2889 HPATH = "instance-add"
2890 HTYPE = constants.HTYPE_INSTANCE
2891 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2892 "disk_template", "swap_size", "mode", "start", "vcpus",
2893 "wait_for_sync", "ip_check", "mac"]
2895 def BuildHooksEnv(self):
2898 This runs on master, primary and secondary nodes of the instance.
2902 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2903 "INSTANCE_DISK_SIZE": self.op.disk_size,
2904 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2905 "INSTANCE_ADD_MODE": self.op.mode,
2907 if self.op.mode == constants.INSTANCE_IMPORT:
2908 env["INSTANCE_SRC_NODE"] = self.op.src_node
2909 env["INSTANCE_SRC_PATH"] = self.op.src_path
2910 env["INSTANCE_SRC_IMAGE"] = self.src_image
2912 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2913 primary_node=self.op.pnode,
2914 secondary_nodes=self.secondaries,
2915 status=self.instance_status,
2916 os_type=self.op.os_type,
2917 memory=self.op.mem_size,
2918 vcpus=self.op.vcpus,
2919 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2922 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2927 def CheckPrereq(self):
2928 """Check prerequisites.
2931 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2932 if not hasattr(self.op, attr):
2933 setattr(self.op, attr, None)
2935 if self.op.mode not in (constants.INSTANCE_CREATE,
2936 constants.INSTANCE_IMPORT):
2937 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2940 if (not self.cfg.GetVGName() and
2941 self.op.disk_template not in constants.DTS_NOT_LVM):
2942 raise errors.OpPrereqError("Cluster does not support lvm-based"
2945 if self.op.mode == constants.INSTANCE_IMPORT:
2946 src_node = getattr(self.op, "src_node", None)
2947 src_path = getattr(self.op, "src_path", None)
2948 if src_node is None or src_path is None:
2949 raise errors.OpPrereqError("Importing an instance requires source"
2950 " node and path options")
2951 src_node_full = self.cfg.ExpandNodeName(src_node)
2952 if src_node_full is None:
2953 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2954 self.op.src_node = src_node = src_node_full
2956 if not os.path.isabs(src_path):
2957 raise errors.OpPrereqError("The source path must be absolute")
2959 export_info = rpc.call_export_info(src_node, src_path)
2962 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2964 if not export_info.has_section(constants.INISECT_EXP):
2965 raise errors.ProgrammerError("Corrupted export config")
2967 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2968 if (int(ei_version) != constants.EXPORT_VERSION):
2969 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2970 (ei_version, constants.EXPORT_VERSION))
2972 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2973 raise errors.OpPrereqError("Can't import instance with more than"
2976 # FIXME: are the old os-es, disk sizes, etc. useful?
2977 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2978 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2980 self.src_image = diskimage
2981 else: # INSTANCE_CREATE
2982 if getattr(self.op, "os_type", None) is None:
2983 raise errors.OpPrereqError("No guest OS specified")
2985 # check primary node
2986 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2988 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2990 self.op.pnode = pnode.name
2992 self.secondaries = []
2993 # disk template and mirror node verification
2994 if self.op.disk_template not in constants.DISK_TEMPLATES:
2995 raise errors.OpPrereqError("Invalid disk template name")
2997 if self.op.disk_template in constants.DTS_NET_MIRROR:
2998 if getattr(self.op, "snode", None) is None:
2999 raise errors.OpPrereqError("The networked disk templates need"
3002 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3003 if snode_name is None:
3004 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3006 elif snode_name == pnode.name:
3007 raise errors.OpPrereqError("The secondary node cannot be"
3008 " the primary node.")
3009 self.secondaries.append(snode_name)
3011 # Required free disk space as a function of disk and swap space
3013 constants.DT_DISKLESS: None,
3014 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3015 # 256 MB are added for drbd metadata, 128MB for each drbd device
3016 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3019 if self.op.disk_template not in req_size_dict:
3020 raise errors.ProgrammerError("Disk template '%s' size requirement"
3021 " is unknown" % self.op.disk_template)
3023 req_size = req_size_dict[self.op.disk_template]
3025 # Check lv size requirements
3026 if req_size is not None:
3027 nodenames = [pnode.name] + self.secondaries
3028 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3029 for node in nodenames:
3030 info = nodeinfo.get(node, None)
3032 raise errors.OpPrereqError("Cannot get current information"
3033 " from node '%s'" % nodeinfo)
3034 vg_free = info.get('vg_free', None)
3035 if not isinstance(vg_free, int):
3036 raise errors.OpPrereqError("Can't compute free disk space on"
3038 if req_size > info['vg_free']:
3039 raise errors.OpPrereqError("Not enough disk space on target node %s."
3040 " %d MB available, %d MB required" %
3041 (node, info['vg_free'], req_size))
3044 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3046 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3047 " primary node" % self.op.os_type)
3049 if self.op.kernel_path == constants.VALUE_NONE:
3050 raise errors.OpPrereqError("Can't set instance kernel to none")
3052 # instance verification
3053 hostname1 = utils.HostInfo(self.op.instance_name)
3055 self.op.instance_name = instance_name = hostname1.name
3056 instance_list = self.cfg.GetInstanceList()
3057 if instance_name in instance_list:
3058 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3061 ip = getattr(self.op, "ip", None)
3062 if ip is None or ip.lower() == "none":
3064 elif ip.lower() == "auto":
3065 inst_ip = hostname1.ip
3067 if not utils.IsValidIP(ip):
3068 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3069 " like a valid IP" % ip)
3071 self.inst_ip = inst_ip
3073 if self.op.start and not self.op.ip_check:
3074 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3075 " adding an instance in start mode")
3077 if self.op.ip_check:
3078 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3079 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3080 (hostname1.ip, instance_name))
3082 # MAC address verification
3083 if self.op.mac != "auto":
3084 if not utils.IsValidMac(self.op.mac.lower()):
3085 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3088 # bridge verification
3089 bridge = getattr(self.op, "bridge", None)
3091 self.op.bridge = self.cfg.GetDefBridge()
3093 self.op.bridge = bridge
3095 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3096 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3097 " destination node '%s'" %
3098 (self.op.bridge, pnode.name))
3100 # boot order verification
3101 if self.op.hvm_boot_order is not None:
3102 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3103 raise errors.OpPrereqError("invalid boot order specified,"
3104 " must be one or more of [acdn]")
3107 self.instance_status = 'up'
3109 self.instance_status = 'down'
3111 def Exec(self, feedback_fn):
3112 """Create and add the instance to the cluster.
3115 instance = self.op.instance_name
3116 pnode_name = self.pnode.name
3118 if self.op.mac == "auto":
3119 mac_address = self.cfg.GenerateMAC()
3121 mac_address = self.op.mac
3123 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3124 if self.inst_ip is not None:
3125 nic.ip = self.inst_ip
3127 ht_kind = self.sstore.GetHypervisorType()
3128 if ht_kind in constants.HTS_REQ_PORT:
3129 network_port = self.cfg.AllocatePort()
3133 disks = _GenerateDiskTemplate(self.cfg,
3134 self.op.disk_template,
3135 instance, pnode_name,
3136 self.secondaries, self.op.disk_size,
3139 iobj = objects.Instance(name=instance, os=self.op.os_type,
3140 primary_node=pnode_name,
3141 memory=self.op.mem_size,
3142 vcpus=self.op.vcpus,
3143 nics=[nic], disks=disks,
3144 disk_template=self.op.disk_template,
3145 status=self.instance_status,
3146 network_port=network_port,
3147 kernel_path=self.op.kernel_path,
3148 initrd_path=self.op.initrd_path,
3149 hvm_boot_order=self.op.hvm_boot_order,
3152 feedback_fn("* creating instance disks...")
3153 if not _CreateDisks(self.cfg, iobj):
3154 _RemoveDisks(iobj, self.cfg)
3155 raise errors.OpExecError("Device creation failed, reverting...")
3157 feedback_fn("adding instance %s to cluster config" % instance)
3159 self.cfg.AddInstance(iobj)
3161 if self.op.wait_for_sync:
3162 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3163 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3164 # make sure the disks are not degraded (still sync-ing is ok)
3166 feedback_fn("* checking mirrors status")
3167 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3172 _RemoveDisks(iobj, self.cfg)
3173 self.cfg.RemoveInstance(iobj.name)
3174 raise errors.OpExecError("There are some degraded disks for"
3177 feedback_fn("creating os for instance %s on node %s" %
3178 (instance, pnode_name))
3180 if iobj.disk_template != constants.DT_DISKLESS:
3181 if self.op.mode == constants.INSTANCE_CREATE:
3182 feedback_fn("* running the instance OS create scripts...")
3183 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3184 raise errors.OpExecError("could not add os for instance %s"
3186 (instance, pnode_name))
3188 elif self.op.mode == constants.INSTANCE_IMPORT:
3189 feedback_fn("* running the instance OS import scripts...")
3190 src_node = self.op.src_node
3191 src_image = self.src_image
3192 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3193 src_node, src_image):
3194 raise errors.OpExecError("Could not import os for instance"
3196 (instance, pnode_name))
3198 # also checked in the prereq part
3199 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3203 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3204 feedback_fn("* starting instance...")
3205 if not rpc.call_instance_start(pnode_name, iobj, None):
3206 raise errors.OpExecError("Could not start instance")
3209 class LUConnectConsole(NoHooksLU):
3210 """Connect to an instance's console.
3212 This is somewhat special in that it returns the command line that
3213 you need to run on the master node in order to connect to the
3217 _OP_REQP = ["instance_name"]
3219 def CheckPrereq(self):
3220 """Check prerequisites.
3222 This checks that the instance is in the cluster.
3225 instance = self.cfg.GetInstanceInfo(
3226 self.cfg.ExpandInstanceName(self.op.instance_name))
3227 if instance is None:
3228 raise errors.OpPrereqError("Instance '%s' not known" %
3229 self.op.instance_name)
3230 self.instance = instance
3232 def Exec(self, feedback_fn):
3233 """Connect to the console of an instance
3236 instance = self.instance
3237 node = instance.primary_node
3239 node_insts = rpc.call_instance_list([node])[node]
3240 if node_insts is False:
3241 raise errors.OpExecError("Can't connect to node %s." % node)
3243 if instance.name not in node_insts:
3244 raise errors.OpExecError("Instance %s is not running." % instance.name)
3246 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3248 hyper = hypervisor.GetHypervisor()
3249 console_cmd = hyper.GetShellCommandForConsole(instance)
3252 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3255 class LUReplaceDisks(LogicalUnit):
3256 """Replace the disks of an instance.
3259 HPATH = "mirrors-replace"
3260 HTYPE = constants.HTYPE_INSTANCE
3261 _OP_REQP = ["instance_name", "mode", "disks"]
3263 def BuildHooksEnv(self):
3266 This runs on the master, the primary and all the secondaries.
3270 "MODE": self.op.mode,
3271 "NEW_SECONDARY": self.op.remote_node,
3272 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3274 env.update(_BuildInstanceHookEnvByObject(self.instance))
3276 self.sstore.GetMasterNode(),
3277 self.instance.primary_node,
3279 if self.op.remote_node is not None:
3280 nl.append(self.op.remote_node)
3283 def CheckPrereq(self):
3284 """Check prerequisites.
3286 This checks that the instance is in the cluster.
3289 instance = self.cfg.GetInstanceInfo(
3290 self.cfg.ExpandInstanceName(self.op.instance_name))
3291 if instance is None:
3292 raise errors.OpPrereqError("Instance '%s' not known" %
3293 self.op.instance_name)
3294 self.instance = instance
3295 self.op.instance_name = instance.name
3297 if instance.disk_template not in constants.DTS_NET_MIRROR:
3298 raise errors.OpPrereqError("Instance's disk layout is not"
3299 " network mirrored.")
3301 if len(instance.secondary_nodes) != 1:
3302 raise errors.OpPrereqError("The instance has a strange layout,"
3303 " expected one secondary but found %d" %
3304 len(instance.secondary_nodes))
3306 self.sec_node = instance.secondary_nodes[0]
3308 remote_node = getattr(self.op, "remote_node", None)
3309 if remote_node is not None:
3310 remote_node = self.cfg.ExpandNodeName(remote_node)
3311 if remote_node is None:
3312 raise errors.OpPrereqError("Node '%s' not known" %
3313 self.op.remote_node)
3314 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3316 self.remote_node_info = None
3317 if remote_node == instance.primary_node:
3318 raise errors.OpPrereqError("The specified node is the primary node of"
3320 elif remote_node == self.sec_node:
3321 if self.op.mode == constants.REPLACE_DISK_SEC:
3322 # this is for DRBD8, where we can't execute the same mode of
3323 # replacement as for drbd7 (no different port allocated)
3324 raise errors.OpPrereqError("Same secondary given, cannot execute"
3326 # the user gave the current secondary, switch to
3327 # 'no-replace-secondary' mode for drbd7
3329 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3330 self.op.mode != constants.REPLACE_DISK_ALL):
3331 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3332 " disks replacement, not individual ones")
3333 if instance.disk_template == constants.DT_DRBD8:
3334 if (self.op.mode == constants.REPLACE_DISK_ALL and
3335 remote_node is not None):
3336 # switch to replace secondary mode
3337 self.op.mode = constants.REPLACE_DISK_SEC
3339 if self.op.mode == constants.REPLACE_DISK_ALL:
3340 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3341 " secondary disk replacement, not"
3343 elif self.op.mode == constants.REPLACE_DISK_PRI:
3344 if remote_node is not None:
3345 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3346 " the secondary while doing a primary"
3347 " node disk replacement")
3348 self.tgt_node = instance.primary_node
3349 self.oth_node = instance.secondary_nodes[0]
3350 elif self.op.mode == constants.REPLACE_DISK_SEC:
3351 self.new_node = remote_node # this can be None, in which case
3352 # we don't change the secondary
3353 self.tgt_node = instance.secondary_nodes[0]
3354 self.oth_node = instance.primary_node
3356 raise errors.ProgrammerError("Unhandled disk replace mode")
3358 for name in self.op.disks:
3359 if instance.FindDisk(name) is None:
3360 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3361 (name, instance.name))
3362 self.op.remote_node = remote_node
3364 def _ExecRR1(self, feedback_fn):
3365 """Replace the disks of an instance.
3368 instance = self.instance
3371 if self.op.remote_node is None:
3372 remote_node = self.sec_node
3374 remote_node = self.op.remote_node
3376 for dev in instance.disks:
3378 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3379 names = _GenerateUniqueNames(cfg, lv_names)
3380 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3381 remote_node, size, names)
3382 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3383 logger.Info("adding new mirror component on secondary for %s" %
3386 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3388 _GetInstanceInfoText(instance)):
3389 raise errors.OpExecError("Failed to create new component on secondary"
3390 " node %s. Full abort, cleanup manually!" %
3393 logger.Info("adding new mirror component on primary")
3395 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3397 _GetInstanceInfoText(instance)):
3398 # remove secondary dev
3399 cfg.SetDiskID(new_drbd, remote_node)
3400 rpc.call_blockdev_remove(remote_node, new_drbd)
3401 raise errors.OpExecError("Failed to create volume on primary!"
3402 " Full abort, cleanup manually!!")
3404 # the device exists now
3405 # call the primary node to add the mirror to md
3406 logger.Info("adding new mirror component to md")
3407 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3409 logger.Error("Can't add mirror compoment to md!")
3410 cfg.SetDiskID(new_drbd, remote_node)
3411 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3412 logger.Error("Can't rollback on secondary")
3413 cfg.SetDiskID(new_drbd, instance.primary_node)
3414 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3415 logger.Error("Can't rollback on primary")
3416 raise errors.OpExecError("Full abort, cleanup manually!!")
3418 dev.children.append(new_drbd)
3419 cfg.AddInstance(instance)
3421 # this can fail as the old devices are degraded and _WaitForSync
3422 # does a combined result over all disks, so we don't check its
3424 _WaitForSync(cfg, instance, self.proc, unlock=True)
3426 # so check manually all the devices
3427 for name in iv_names:
3428 dev, child, new_drbd = iv_names[name]
3429 cfg.SetDiskID(dev, instance.primary_node)
3430 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3432 raise errors.OpExecError("MD device %s is degraded!" % name)
3433 cfg.SetDiskID(new_drbd, instance.primary_node)
3434 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3436 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3438 for name in iv_names:
3439 dev, child, new_drbd = iv_names[name]
3440 logger.Info("remove mirror %s component" % name)
3441 cfg.SetDiskID(dev, instance.primary_node)
3442 if not rpc.call_blockdev_removechildren(instance.primary_node,
3444 logger.Error("Can't remove child from mirror, aborting"
3445 " *this device cleanup*.\nYou need to cleanup manually!!")
3448 for node in child.logical_id[:2]:
3449 logger.Info("remove child device on %s" % node)
3450 cfg.SetDiskID(child, node)
3451 if not rpc.call_blockdev_remove(node, child):
3452 logger.Error("Warning: failed to remove device from node %s,"
3453 " continuing operation." % node)
3455 dev.children.remove(child)
3457 cfg.AddInstance(instance)
3459 def _ExecD8DiskOnly(self, feedback_fn):
3460 """Replace a disk on the primary or secondary for dbrd8.
3462 The algorithm for replace is quite complicated:
3463 - for each disk to be replaced:
3464 - create new LVs on the target node with unique names
3465 - detach old LVs from the drbd device
3466 - rename old LVs to name_replaced.<time_t>
3467 - rename new LVs to old LVs
3468 - attach the new LVs (with the old names now) to the drbd device
3469 - wait for sync across all devices
3470 - for each modified disk:
3471 - remove old LVs (which have the name name_replaces.<time_t>)
3473 Failures are not very well handled.
3477 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3478 instance = self.instance
3480 vgname = self.cfg.GetVGName()
3483 tgt_node = self.tgt_node
3484 oth_node = self.oth_node
3486 # Step: check device activation
3487 self.proc.LogStep(1, steps_total, "check device existence")
3488 info("checking volume groups")
3489 my_vg = cfg.GetVGName()
3490 results = rpc.call_vg_list([oth_node, tgt_node])
3492 raise errors.OpExecError("Can't list volume groups on the nodes")
3493 for node in oth_node, tgt_node:
3494 res = results.get(node, False)
3495 if not res or my_vg not in res:
3496 raise errors.OpExecError("Volume group '%s' not found on %s" %
3498 for dev in instance.disks:
3499 if not dev.iv_name in self.op.disks:
3501 for node in tgt_node, oth_node:
3502 info("checking %s on %s" % (dev.iv_name, node))
3503 cfg.SetDiskID(dev, node)
3504 if not rpc.call_blockdev_find(node, dev):
3505 raise errors.OpExecError("Can't find device %s on node %s" %
3506 (dev.iv_name, node))
3508 # Step: check other node consistency
3509 self.proc.LogStep(2, steps_total, "check peer consistency")
3510 for dev in instance.disks:
3511 if not dev.iv_name in self.op.disks:
3513 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3514 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3515 oth_node==instance.primary_node):
3516 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3517 " to replace disks on this node (%s)" %
3518 (oth_node, tgt_node))
3520 # Step: create new storage
3521 self.proc.LogStep(3, steps_total, "allocate new storage")
3522 for dev in instance.disks:
3523 if not dev.iv_name in self.op.disks:
3526 cfg.SetDiskID(dev, tgt_node)
3527 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3528 names = _GenerateUniqueNames(cfg, lv_names)
3529 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3530 logical_id=(vgname, names[0]))
3531 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3532 logical_id=(vgname, names[1]))
3533 new_lvs = [lv_data, lv_meta]
3534 old_lvs = dev.children
3535 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3536 info("creating new local storage on %s for %s" %
3537 (tgt_node, dev.iv_name))
3538 # since we *always* want to create this LV, we use the
3539 # _Create...OnPrimary (which forces the creation), even if we
3540 # are talking about the secondary node
3541 for new_lv in new_lvs:
3542 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3543 _GetInstanceInfoText(instance)):
3544 raise errors.OpExecError("Failed to create new LV named '%s' on"
3546 (new_lv.logical_id[1], tgt_node))
3548 # Step: for each lv, detach+rename*2+attach
3549 self.proc.LogStep(4, steps_total, "change drbd configuration")
3550 for dev, old_lvs, new_lvs in iv_names.itervalues():
3551 info("detaching %s drbd from local storage" % dev.iv_name)
3552 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3553 raise errors.OpExecError("Can't detach drbd from local storage on node"
3554 " %s for device %s" % (tgt_node, dev.iv_name))
3556 #cfg.Update(instance)
3558 # ok, we created the new LVs, so now we know we have the needed
3559 # storage; as such, we proceed on the target node to rename
3560 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3561 # using the assumption that logical_id == physical_id (which in
3562 # turn is the unique_id on that node)
3564 # FIXME(iustin): use a better name for the replaced LVs
3565 temp_suffix = int(time.time())
3566 ren_fn = lambda d, suff: (d.physical_id[0],
3567 d.physical_id[1] + "_replaced-%s" % suff)
3568 # build the rename list based on what LVs exist on the node
3570 for to_ren in old_lvs:
3571 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3572 if find_res is not None: # device exists
3573 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3575 info("renaming the old LVs on the target node")
3576 if not rpc.call_blockdev_rename(tgt_node, rlist):
3577 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3578 # now we rename the new LVs to the old LVs
3579 info("renaming the new LVs on the target node")
3580 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3581 if not rpc.call_blockdev_rename(tgt_node, rlist):
3582 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3584 for old, new in zip(old_lvs, new_lvs):
3585 new.logical_id = old.logical_id
3586 cfg.SetDiskID(new, tgt_node)
3588 for disk in old_lvs:
3589 disk.logical_id = ren_fn(disk, temp_suffix)
3590 cfg.SetDiskID(disk, tgt_node)
3592 # now that the new lvs have the old name, we can add them to the device
3593 info("adding new mirror component on %s" % tgt_node)
3594 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3595 for new_lv in new_lvs:
3596 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3597 warning("Can't rollback device %s", hint="manually cleanup unused"
3599 raise errors.OpExecError("Can't add local storage to drbd")
3601 dev.children = new_lvs
3602 cfg.Update(instance)
3604 # Step: wait for sync
3606 # this can fail as the old devices are degraded and _WaitForSync
3607 # does a combined result over all disks, so we don't check its
3609 self.proc.LogStep(5, steps_total, "sync devices")
3610 _WaitForSync(cfg, instance, self.proc, unlock=True)
3612 # so check manually all the devices
3613 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3614 cfg.SetDiskID(dev, instance.primary_node)
3615 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3617 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3619 # Step: remove old storage
3620 self.proc.LogStep(6, steps_total, "removing old storage")
3621 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3622 info("remove logical volumes for %s" % name)
3624 cfg.SetDiskID(lv, tgt_node)
3625 if not rpc.call_blockdev_remove(tgt_node, lv):
3626 warning("Can't remove old LV", hint="manually remove unused LVs")
3629 def _ExecD8Secondary(self, feedback_fn):
3630 """Replace the secondary node for drbd8.
3632 The algorithm for replace is quite complicated:
3633 - for all disks of the instance:
3634 - create new LVs on the new node with same names
3635 - shutdown the drbd device on the old secondary
3636 - disconnect the drbd network on the primary
3637 - create the drbd device on the new secondary
3638 - network attach the drbd on the primary, using an artifice:
3639 the drbd code for Attach() will connect to the network if it
3640 finds a device which is connected to the good local disks but
3642 - wait for sync across all devices
3643 - remove all disks from the old secondary
3645 Failures are not very well handled.
3649 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3650 instance = self.instance
3652 vgname = self.cfg.GetVGName()
3655 old_node = self.tgt_node
3656 new_node = self.new_node
3657 pri_node = instance.primary_node
3659 # Step: check device activation
3660 self.proc.LogStep(1, steps_total, "check device existence")
3661 info("checking volume groups")
3662 my_vg = cfg.GetVGName()
3663 results = rpc.call_vg_list([pri_node, new_node])
3665 raise errors.OpExecError("Can't list volume groups on the nodes")
3666 for node in pri_node, new_node:
3667 res = results.get(node, False)
3668 if not res or my_vg not in res:
3669 raise errors.OpExecError("Volume group '%s' not found on %s" %
3671 for dev in instance.disks:
3672 if not dev.iv_name in self.op.disks:
3674 info("checking %s on %s" % (dev.iv_name, pri_node))
3675 cfg.SetDiskID(dev, pri_node)
3676 if not rpc.call_blockdev_find(pri_node, dev):
3677 raise errors.OpExecError("Can't find device %s on node %s" %
3678 (dev.iv_name, pri_node))
3680 # Step: check other node consistency
3681 self.proc.LogStep(2, steps_total, "check peer consistency")
3682 for dev in instance.disks:
3683 if not dev.iv_name in self.op.disks:
3685 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3686 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3687 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3688 " unsafe to replace the secondary" %
3691 # Step: create new storage
3692 self.proc.LogStep(3, steps_total, "allocate new storage")
3693 for dev in instance.disks:
3695 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3696 # since we *always* want to create this LV, we use the
3697 # _Create...OnPrimary (which forces the creation), even if we
3698 # are talking about the secondary node
3699 for new_lv in dev.children:
3700 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3701 _GetInstanceInfoText(instance)):
3702 raise errors.OpExecError("Failed to create new LV named '%s' on"
3704 (new_lv.logical_id[1], new_node))
3706 iv_names[dev.iv_name] = (dev, dev.children)
3708 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3709 for dev in instance.disks:
3711 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3712 # create new devices on new_node
3713 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3714 logical_id=(pri_node, new_node,
3716 children=dev.children)
3717 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3719 _GetInstanceInfoText(instance)):
3720 raise errors.OpExecError("Failed to create new DRBD on"
3721 " node '%s'" % new_node)
3723 for dev in instance.disks:
3724 # we have new devices, shutdown the drbd on the old secondary
3725 info("shutting down drbd for %s on old node" % dev.iv_name)
3726 cfg.SetDiskID(dev, old_node)
3727 if not rpc.call_blockdev_shutdown(old_node, dev):
3728 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3729 hint="Please cleanup this device manually as soon as possible")
3731 info("detaching primary drbds from the network (=> standalone)")
3733 for dev in instance.disks:
3734 cfg.SetDiskID(dev, pri_node)
3735 # set the physical (unique in bdev terms) id to None, meaning
3736 # detach from network
3737 dev.physical_id = (None,) * len(dev.physical_id)
3738 # and 'find' the device, which will 'fix' it to match the
3740 if rpc.call_blockdev_find(pri_node, dev):
3743 warning("Failed to detach drbd %s from network, unusual case" %
3747 # no detaches succeeded (very unlikely)
3748 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3750 # if we managed to detach at least one, we update all the disks of
3751 # the instance to point to the new secondary
3752 info("updating instance configuration")
3753 for dev in instance.disks:
3754 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3755 cfg.SetDiskID(dev, pri_node)
3756 cfg.Update(instance)
3758 # and now perform the drbd attach
3759 info("attaching primary drbds to new secondary (standalone => connected)")
3761 for dev in instance.disks:
3762 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3763 # since the attach is smart, it's enough to 'find' the device,
3764 # it will automatically activate the network, if the physical_id
3766 cfg.SetDiskID(dev, pri_node)
3767 if not rpc.call_blockdev_find(pri_node, dev):
3768 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3769 "please do a gnt-instance info to see the status of disks")
3771 # this can fail as the old devices are degraded and _WaitForSync
3772 # does a combined result over all disks, so we don't check its
3774 self.proc.LogStep(5, steps_total, "sync devices")
3775 _WaitForSync(cfg, instance, self.proc, unlock=True)
3777 # so check manually all the devices
3778 for name, (dev, old_lvs) in iv_names.iteritems():
3779 cfg.SetDiskID(dev, pri_node)
3780 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3782 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3784 self.proc.LogStep(6, steps_total, "removing old storage")
3785 for name, (dev, old_lvs) in iv_names.iteritems():
3786 info("remove logical volumes for %s" % name)
3788 cfg.SetDiskID(lv, old_node)
3789 if not rpc.call_blockdev_remove(old_node, lv):
3790 warning("Can't remove LV on old secondary",
3791 hint="Cleanup stale volumes by hand")
3793 def Exec(self, feedback_fn):
3794 """Execute disk replacement.
3796 This dispatches the disk replacement to the appropriate handler.
3799 instance = self.instance
3800 if instance.disk_template == constants.DT_REMOTE_RAID1:
3802 elif instance.disk_template == constants.DT_DRBD8:
3803 if self.op.remote_node is None:
3804 fn = self._ExecD8DiskOnly
3806 fn = self._ExecD8Secondary
3808 raise errors.ProgrammerError("Unhandled disk replacement case")
3809 return fn(feedback_fn)
3812 class LUQueryInstanceData(NoHooksLU):
3813 """Query runtime instance data.
3816 _OP_REQP = ["instances"]
3818 def CheckPrereq(self):
3819 """Check prerequisites.
3821 This only checks the optional instance list against the existing names.
3824 if not isinstance(self.op.instances, list):
3825 raise errors.OpPrereqError("Invalid argument type 'instances'")
3826 if self.op.instances:
3827 self.wanted_instances = []
3828 names = self.op.instances
3830 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3831 if instance is None:
3832 raise errors.OpPrereqError("No such instance name '%s'" % name)
3833 self.wanted_instances.append(instance)
3835 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3836 in self.cfg.GetInstanceList()]
3840 def _ComputeDiskStatus(self, instance, snode, dev):
3841 """Compute block device status.
3844 self.cfg.SetDiskID(dev, instance.primary_node)
3845 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3846 if dev.dev_type in constants.LDS_DRBD:
3847 # we change the snode then (otherwise we use the one passed in)
3848 if dev.logical_id[0] == instance.primary_node:
3849 snode = dev.logical_id[1]
3851 snode = dev.logical_id[0]
3854 self.cfg.SetDiskID(dev, snode)
3855 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3860 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3861 for child in dev.children]
3866 "iv_name": dev.iv_name,
3867 "dev_type": dev.dev_type,
3868 "logical_id": dev.logical_id,
3869 "physical_id": dev.physical_id,
3870 "pstatus": dev_pstatus,
3871 "sstatus": dev_sstatus,
3872 "children": dev_children,
3877 def Exec(self, feedback_fn):
3878 """Gather and return data"""
3880 for instance in self.wanted_instances:
3881 remote_info = rpc.call_instance_info(instance.primary_node,
3883 if remote_info and "state" in remote_info:
3886 remote_state = "down"
3887 if instance.status == "down":
3888 config_state = "down"
3892 disks = [self._ComputeDiskStatus(instance, None, device)
3893 for device in instance.disks]
3896 "name": instance.name,
3897 "config_state": config_state,
3898 "run_state": remote_state,
3899 "pnode": instance.primary_node,
3900 "snodes": instance.secondary_nodes,
3902 "memory": instance.memory,
3903 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3905 "network_port": instance.network_port,
3906 "vcpus": instance.vcpus,
3907 "kernel_path": instance.kernel_path,
3908 "initrd_path": instance.initrd_path,
3909 "hvm_boot_order": instance.hvm_boot_order,
3912 result[instance.name] = idict
3917 class LUSetInstanceParams(LogicalUnit):
3918 """Modifies an instances's parameters.
3921 HPATH = "instance-modify"
3922 HTYPE = constants.HTYPE_INSTANCE
3923 _OP_REQP = ["instance_name"]
3925 def BuildHooksEnv(self):
3928 This runs on the master, primary and secondaries.
3933 args['memory'] = self.mem
3935 args['vcpus'] = self.vcpus
3936 if self.do_ip or self.do_bridge or self.mac:
3940 ip = self.instance.nics[0].ip
3942 bridge = self.bridge
3944 bridge = self.instance.nics[0].bridge
3948 mac = self.instance.nics[0].mac
3949 args['nics'] = [(ip, bridge, mac)]
3950 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3951 nl = [self.sstore.GetMasterNode(),
3952 self.instance.primary_node] + list(self.instance.secondary_nodes)
3955 def CheckPrereq(self):
3956 """Check prerequisites.
3958 This only checks the instance list against the existing names.
3961 self.mem = getattr(self.op, "mem", None)
3962 self.vcpus = getattr(self.op, "vcpus", None)
3963 self.ip = getattr(self.op, "ip", None)
3964 self.mac = getattr(self.op, "mac", None)
3965 self.bridge = getattr(self.op, "bridge", None)
3966 self.kernel_path = getattr(self.op, "kernel_path", None)
3967 self.initrd_path = getattr(self.op, "initrd_path", None)
3968 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
3969 all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
3970 self.kernel_path, self.initrd_path, self.hvm_boot_order]
3971 if all_params.count(None) == len(all_params):
3972 raise errors.OpPrereqError("No changes submitted")
3973 if self.mem is not None:
3975 self.mem = int(self.mem)
3976 except ValueError, err:
3977 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3978 if self.vcpus is not None:
3980 self.vcpus = int(self.vcpus)
3981 except ValueError, err:
3982 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3983 if self.ip is not None:
3985 if self.ip.lower() == "none":
3988 if not utils.IsValidIP(self.ip):
3989 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3992 self.do_bridge = (self.bridge is not None)
3993 if self.mac is not None:
3994 if self.cfg.IsMacInUse(self.mac):
3995 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
3997 if not utils.IsValidMac(self.mac):
3998 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4000 if self.kernel_path is not None:
4001 self.do_kernel_path = True
4002 if self.kernel_path == constants.VALUE_NONE:
4003 raise errors.OpPrereqError("Can't set instance to no kernel")
4005 if self.kernel_path != constants.VALUE_DEFAULT:
4006 if not os.path.isabs(self.kernel_path):
4007 raise errors.OpPrereqError("The kernel path must be an absolute"
4010 self.do_kernel_path = False
4012 if self.initrd_path is not None:
4013 self.do_initrd_path = True
4014 if self.initrd_path not in (constants.VALUE_NONE,
4015 constants.VALUE_DEFAULT):
4016 if not os.path.isabs(self.initrd_path):
4017 raise errors.OpPrereqError("The initrd path must be an absolute"
4020 self.do_initrd_path = False
4022 # boot order verification
4023 if self.hvm_boot_order is not None:
4024 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4025 if len(self.hvm_boot_order.strip("acdn")) != 0:
4026 raise errors.OpPrereqError("invalid boot order specified,"
4027 " must be one or more of [acdn]"
4030 instance = self.cfg.GetInstanceInfo(
4031 self.cfg.ExpandInstanceName(self.op.instance_name))
4032 if instance is None:
4033 raise errors.OpPrereqError("No such instance name '%s'" %
4034 self.op.instance_name)
4035 self.op.instance_name = instance.name
4036 self.instance = instance
4039 def Exec(self, feedback_fn):
4040 """Modifies an instance.
4042 All parameters take effect only at the next restart of the instance.
4045 instance = self.instance
4047 instance.memory = self.mem
4048 result.append(("mem", self.mem))
4050 instance.vcpus = self.vcpus
4051 result.append(("vcpus", self.vcpus))
4053 instance.nics[0].ip = self.ip
4054 result.append(("ip", self.ip))
4056 instance.nics[0].bridge = self.bridge
4057 result.append(("bridge", self.bridge))
4059 instance.nics[0].mac = self.mac
4060 result.append(("mac", self.mac))
4061 if self.do_kernel_path:
4062 instance.kernel_path = self.kernel_path
4063 result.append(("kernel_path", self.kernel_path))
4064 if self.do_initrd_path:
4065 instance.initrd_path = self.initrd_path
4066 result.append(("initrd_path", self.initrd_path))
4067 if self.hvm_boot_order:
4068 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4069 instance.hvm_boot_order = None
4071 instance.hvm_boot_order = self.hvm_boot_order
4072 result.append(("hvm_boot_order", self.hvm_boot_order))
4074 self.cfg.AddInstance(instance)
4079 class LUQueryExports(NoHooksLU):
4080 """Query the exports list
4085 def CheckPrereq(self):
4086 """Check that the nodelist contains only existing nodes.
4089 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4091 def Exec(self, feedback_fn):
4092 """Compute the list of all the exported system images.
4095 a dictionary with the structure node->(export-list)
4096 where export-list is a list of the instances exported on
4100 return rpc.call_export_list(self.nodes)
4103 class LUExportInstance(LogicalUnit):
4104 """Export an instance to an image in the cluster.
4107 HPATH = "instance-export"
4108 HTYPE = constants.HTYPE_INSTANCE
4109 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4111 def BuildHooksEnv(self):
4114 This will run on the master, primary node and target node.
4118 "EXPORT_NODE": self.op.target_node,
4119 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4121 env.update(_BuildInstanceHookEnvByObject(self.instance))
4122 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4123 self.op.target_node]
4126 def CheckPrereq(self):
4127 """Check prerequisites.
4129 This checks that the instance name is a valid one.
4132 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4133 self.instance = self.cfg.GetInstanceInfo(instance_name)
4134 if self.instance is None:
4135 raise errors.OpPrereqError("Instance '%s' not found" %
4136 self.op.instance_name)
4139 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4140 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4142 if self.dst_node is None:
4143 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4144 self.op.target_node)
4145 self.op.target_node = self.dst_node.name
4147 def Exec(self, feedback_fn):
4148 """Export an instance to an image in the cluster.
4151 instance = self.instance
4152 dst_node = self.dst_node
4153 src_node = instance.primary_node
4154 if self.op.shutdown:
4155 # shutdown the instance, but not the disks
4156 if not rpc.call_instance_shutdown(src_node, instance):
4157 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4158 (instance.name, source_node))
4160 vgname = self.cfg.GetVGName()
4165 for disk in instance.disks:
4166 if disk.iv_name == "sda":
4167 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4168 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4170 if not new_dev_name:
4171 logger.Error("could not snapshot block device %s on node %s" %
4172 (disk.logical_id[1], src_node))
4174 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4175 logical_id=(vgname, new_dev_name),
4176 physical_id=(vgname, new_dev_name),
4177 iv_name=disk.iv_name)
4178 snap_disks.append(new_dev)
4181 if self.op.shutdown and instance.status == "up":
4182 if not rpc.call_instance_start(src_node, instance, None):
4183 _ShutdownInstanceDisks(instance, self.cfg)
4184 raise errors.OpExecError("Could not start instance")
4186 # TODO: check for size
4188 for dev in snap_disks:
4189 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4191 logger.Error("could not export block device %s from node"
4193 (dev.logical_id[1], src_node, dst_node.name))
4194 if not rpc.call_blockdev_remove(src_node, dev):
4195 logger.Error("could not remove snapshot block device %s from"
4196 " node %s" % (dev.logical_id[1], src_node))
4198 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4199 logger.Error("could not finalize export for instance %s on node %s" %
4200 (instance.name, dst_node.name))
4202 nodelist = self.cfg.GetNodeList()
4203 nodelist.remove(dst_node.name)
4205 # on one-node clusters nodelist will be empty after the removal
4206 # if we proceed the backup would be removed because OpQueryExports
4207 # substitutes an empty list with the full cluster node list.
4209 op = opcodes.OpQueryExports(nodes=nodelist)
4210 exportlist = self.proc.ChainOpCode(op)
4211 for node in exportlist:
4212 if instance.name in exportlist[node]:
4213 if not rpc.call_export_remove(node, instance.name):
4214 logger.Error("could not remove older export for instance %s"
4215 " on node %s" % (instance.name, node))
4218 class TagsLU(NoHooksLU):
4221 This is an abstract class which is the parent of all the other tags LUs.
4224 def CheckPrereq(self):
4225 """Check prerequisites.
4228 if self.op.kind == constants.TAG_CLUSTER:
4229 self.target = self.cfg.GetClusterInfo()
4230 elif self.op.kind == constants.TAG_NODE:
4231 name = self.cfg.ExpandNodeName(self.op.name)
4233 raise errors.OpPrereqError("Invalid node name (%s)" %
4236 self.target = self.cfg.GetNodeInfo(name)
4237 elif self.op.kind == constants.TAG_INSTANCE:
4238 name = self.cfg.ExpandInstanceName(self.op.name)
4240 raise errors.OpPrereqError("Invalid instance name (%s)" %
4243 self.target = self.cfg.GetInstanceInfo(name)
4245 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4249 class LUGetTags(TagsLU):
4250 """Returns the tags of a given object.
4253 _OP_REQP = ["kind", "name"]
4255 def Exec(self, feedback_fn):
4256 """Returns the tag list.
4259 return self.target.GetTags()
4262 class LUSearchTags(NoHooksLU):
4263 """Searches the tags for a given pattern.
4266 _OP_REQP = ["pattern"]
4268 def CheckPrereq(self):
4269 """Check prerequisites.
4271 This checks the pattern passed for validity by compiling it.
4275 self.re = re.compile(self.op.pattern)
4276 except re.error, err:
4277 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4278 (self.op.pattern, err))
4280 def Exec(self, feedback_fn):
4281 """Returns the tag list.
4285 tgts = [("/cluster", cfg.GetClusterInfo())]
4286 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4287 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4288 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4289 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4291 for path, target in tgts:
4292 for tag in target.GetTags():
4293 if self.re.search(tag):
4294 results.append((path, tag))
4298 class LUAddTags(TagsLU):
4299 """Sets a tag on a given object.
4302 _OP_REQP = ["kind", "name", "tags"]
4304 def CheckPrereq(self):
4305 """Check prerequisites.
4307 This checks the type and length of the tag name and value.
4310 TagsLU.CheckPrereq(self)
4311 for tag in self.op.tags:
4312 objects.TaggableObject.ValidateTag(tag)
4314 def Exec(self, feedback_fn):
4319 for tag in self.op.tags:
4320 self.target.AddTag(tag)
4321 except errors.TagError, err:
4322 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4324 self.cfg.Update(self.target)
4325 except errors.ConfigurationError:
4326 raise errors.OpRetryError("There has been a modification to the"
4327 " config file and the operation has been"
4328 " aborted. Please retry.")
4331 class LUDelTags(TagsLU):
4332 """Delete a list of tags from a given object.
4335 _OP_REQP = ["kind", "name", "tags"]
4337 def CheckPrereq(self):
4338 """Check prerequisites.
4340 This checks that we have the given tag.
4343 TagsLU.CheckPrereq(self)
4344 for tag in self.op.tags:
4345 objects.TaggableObject.ValidateTag(tag)
4346 del_tags = frozenset(self.op.tags)
4347 cur_tags = self.target.GetTags()
4348 if not del_tags <= cur_tags:
4349 diff_tags = del_tags - cur_tags
4350 diff_names = ["'%s'" % tag for tag in diff_tags]
4352 raise errors.OpPrereqError("Tag(s) %s not found" %
4353 (",".join(diff_names)))
4355 def Exec(self, feedback_fn):
4356 """Remove the tag from the object.
4359 for tag in self.op.tags:
4360 self.target.RemoveTag(tag)
4362 self.cfg.Update(self.target)
4363 except errors.ConfigurationError:
4364 raise errors.OpRetryError("There has been a modification to the"
4365 " config file and the operation has been"
4366 " aborted. Please retry.")
4368 class LUTestDelay(NoHooksLU):
4369 """Sleep for a specified amount of time.
4371 This LU sleeps on the master and/or nodes for a specified amoutn of
4375 _OP_REQP = ["duration", "on_master", "on_nodes"]
4377 def CheckPrereq(self):
4378 """Check prerequisites.
4380 This checks that we have a good list of nodes and/or the duration
4385 if self.op.on_nodes:
4386 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4388 def Exec(self, feedback_fn):
4389 """Do the actual sleep.
4392 if self.op.on_master:
4393 if not utils.TestDelay(self.op.duration):
4394 raise errors.OpExecError("Error during master delay test")
4395 if self.op.on_nodes:
4396 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4398 raise errors.OpExecError("Complete failure from rpc call")
4399 for node, node_result in result.items():
4401 raise errors.OpExecError("Failure during rpc call to node %s,"
4402 " result: %s" % (node, node_result))