4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 No nodes should be returned as an empty list (and not None).
138 Note that if the HPATH for a LU class is None, this function will
142 raise NotImplementedError
144 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145 """Notify the LU about the results of its hooks.
147 This method is called every time a hooks phase is executed, and notifies
148 the Logical Unit about the hooks' result. The LU can then use it to alter
149 its result based on the hooks. By default the method does nothing and the
150 previous result is passed back unchanged but any LU can define it if it
151 wants to use the local cluster hook-scripts somehow.
154 phase: the hooks phase that has just been run
155 hooks_results: the results of the multi-node hooks rpc call
156 feedback_fn: function to send feedback back to the caller
157 lu_result: the previous result this LU had, or None in the PRE phase.
163 class NoHooksLU(LogicalUnit):
164 """Simple LU which runs no hooks.
166 This LU is intended as a parent for other LogicalUnits which will
167 run no hooks, in order to reduce duplicate code.
174 def _AddHostToEtcHosts(hostname):
175 """Wrapper around utils.SetEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182 def _RemoveHostFromEtcHosts(hostname):
183 """Wrapper around utils.RemoveEtcHostsEntry.
186 hi = utils.HostInfo(name=hostname)
187 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191 def _GetWantedNodes(lu, nodes):
192 """Returns list of checked and expanded node names.
195 nodes: List of nodes (strings) or None for all
198 if not isinstance(nodes, list):
199 raise errors.OpPrereqError("Invalid argument type 'nodes'")
205 node = lu.cfg.ExpandNodeName(name)
207 raise errors.OpPrereqError("No such node name '%s'" % name)
211 wanted = lu.cfg.GetNodeList()
212 return utils.NiceSort(wanted)
215 def _GetWantedInstances(lu, instances):
216 """Returns list of checked and expanded instance names.
219 instances: List of instances (strings) or None for all
222 if not isinstance(instances, list):
223 raise errors.OpPrereqError("Invalid argument type 'instances'")
228 for name in instances:
229 instance = lu.cfg.ExpandInstanceName(name)
231 raise errors.OpPrereqError("No such instance name '%s'" % name)
232 wanted.append(instance)
235 wanted = lu.cfg.GetInstanceList()
236 return utils.NiceSort(wanted)
239 def _CheckOutputFields(static, dynamic, selected):
240 """Checks whether all selected fields are valid.
243 static: Static fields
244 dynamic: Dynamic fields
247 static_fields = frozenset(static)
248 dynamic_fields = frozenset(dynamic)
250 all_fields = static_fields | dynamic_fields
252 if not all_fields.issuperset(selected):
253 raise errors.OpPrereqError("Unknown output fields selected: %s"
254 % ",".join(frozenset(selected).
255 difference(all_fields)))
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259 memory, vcpus, nics):
260 """Builds instance related env variables for hooks from single variables.
263 secondary_nodes: List of secondary nodes as strings
267 "INSTANCE_NAME": name,
268 "INSTANCE_PRIMARY": primary_node,
269 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270 "INSTANCE_OS_TYPE": os_type,
271 "INSTANCE_STATUS": status,
272 "INSTANCE_MEMORY": memory,
273 "INSTANCE_VCPUS": vcpus,
277 nic_count = len(nics)
278 for idx, (ip, bridge, mac) in enumerate(nics):
281 env["INSTANCE_NIC%d_IP" % idx] = ip
282 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
287 env["INSTANCE_NIC_COUNT"] = nic_count
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293 """Builds instance related env variables for hooks from an object.
296 instance: objects.Instance object of instance
297 override: dict of values to override
300 'name': instance.name,
301 'primary_node': instance.primary_node,
302 'secondary_nodes': instance.secondary_nodes,
303 'os_type': instance.os,
304 'status': instance.os,
305 'memory': instance.memory,
306 'vcpus': instance.vcpus,
307 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310 args.update(override)
311 return _BuildInstanceHookEnv(**args)
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315 """Ensure a node has a correct known_hosts entry.
318 fullnode - Fully qualified domain name of host. (str)
319 ip - IPv4 address of host (str)
320 pubkey - the public key of the cluster
323 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
335 logger.Debug('read %s' % (repr(rawline),))
337 parts = rawline.rstrip('\r\n').split()
339 # Ignore unwanted lines
340 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341 fields = parts[0].split(',')
346 for spec in [ ip, fullnode ]:
347 if spec not in fields:
352 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353 if haveall and key == pubkey:
355 save_lines.append(rawline)
356 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359 if havesome and (not haveall or key != pubkey):
361 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364 save_lines.append(rawline)
367 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371 save_lines = save_lines + add_lines
373 # Write a new file and replace old.
374 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
376 newfile = os.fdopen(fd, 'w')
378 newfile.write(''.join(save_lines))
381 logger.Debug("Wrote new known_hosts.")
382 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385 # Simply appending a new line will do the trick.
387 for add in add_lines:
393 def _HasValidVG(vglist, vgname):
394 """Checks if the volume group list is valid.
396 A non-None return value means there's an error, and the return value
397 is the error message.
400 vgsize = vglist.get(vgname, None)
402 return "volume group '%s' missing" % vgname
404 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
409 def _InitSSHSetup(node):
410 """Setup the SSH configuration for the cluster.
413 This generates a dsa keypair for root, adds the pub key to the
414 permitted hosts and adds the hostkey to its own known hosts.
417 node: the name of this host as a fqdn
420 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
422 for name in priv_key, pub_key:
423 if os.path.exists(name):
424 utils.CreateBackup(name)
425 utils.RemoveFile(name)
427 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
431 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434 f = open(pub_key, 'r')
436 utils.AddAuthorizedKey(auth_keys, f.read(8192))
441 def _InitGanetiServerSetup(ss):
442 """Setup the necessary configuration for the initial node daemon.
444 This creates the nodepass file containing the shared password for
445 the cluster and also generates the SSL certificate.
448 # Create pseudo random password
449 randpass = sha.new(os.urandom(64)).hexdigest()
450 # and write it into sstore
451 ss.SetKey(ss.SS_NODED_PASS, randpass)
453 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454 "-days", str(365*5), "-nodes", "-x509",
455 "-keyout", constants.SSL_CERT_FILE,
456 "-out", constants.SSL_CERT_FILE, "-batch"])
458 raise errors.OpExecError("could not generate server ssl cert, command"
459 " %s had exitcode %s and error message %s" %
460 (result.cmd, result.exit_code, result.output))
462 os.chmod(constants.SSL_CERT_FILE, 0400)
464 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467 raise errors.OpExecError("Could not start the node daemon, command %s"
468 " had exitcode %s and error %s" %
469 (result.cmd, result.exit_code, result.output))
472 def _CheckInstanceBridgesExist(instance):
473 """Check that the brigdes needed by an instance exist.
476 # check bridges existance
477 brlist = [nic.bridge for nic in instance.nics]
478 if not rpc.call_bridges_exist(instance.primary_node, brlist):
479 raise errors.OpPrereqError("one or more target bridges %s does not"
480 " exist on destination node '%s'" %
481 (brlist, instance.primary_node))
484 class LUInitCluster(LogicalUnit):
485 """Initialise the cluster.
488 HPATH = "cluster-init"
489 HTYPE = constants.HTYPE_CLUSTER
490 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491 "def_bridge", "master_netdev"]
494 def BuildHooksEnv(self):
497 Notes: Since we don't require a cluster, we must manually add
498 ourselves in the post-run node list.
501 env = {"OP_TARGET": self.op.cluster_name}
502 return env, [], [self.hostname.name]
504 def CheckPrereq(self):
505 """Verify that the passed name is a valid one.
508 if config.ConfigWriter.IsCluster():
509 raise errors.OpPrereqError("Cluster is already initialised")
511 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512 if not os.path.exists(constants.VNC_PASSWORD_FILE):
513 raise errors.OpPrereqError("Please prepare the cluster VNC"
515 constants.VNC_PASSWORD_FILE)
517 self.hostname = hostname = utils.HostInfo()
519 if hostname.ip.startswith("127."):
520 raise errors.OpPrereqError("This host's IP resolves to the private"
521 " range (%s). Please fix DNS or %s." %
522 (hostname.ip, constants.ETC_HOSTS))
524 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525 source=constants.LOCALHOST_IP_ADDRESS):
526 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527 " to %s,\nbut this ip address does not"
528 " belong to this host."
529 " Aborting." % hostname.ip)
531 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
533 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
535 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
537 secondary_ip = getattr(self.op, "secondary_ip", None)
538 if secondary_ip and not utils.IsValidIP(secondary_ip):
539 raise errors.OpPrereqError("Invalid secondary ip given")
541 secondary_ip != hostname.ip and
542 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543 source=constants.LOCALHOST_IP_ADDRESS))):
544 raise errors.OpPrereqError("You gave %s as secondary IP,"
545 " but it does not belong to this host." %
547 self.secondary_ip = secondary_ip
549 # checks presence of the volume group given
550 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553 raise errors.OpPrereqError("Error: %s" % vgstatus)
555 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
557 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560 if self.op.hypervisor_type not in constants.HYPER_TYPES:
561 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562 self.op.hypervisor_type)
564 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
566 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567 (self.op.master_netdev,
568 result.output.strip()))
570 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572 raise errors.OpPrereqError("Init.d script '%s' missing or not"
573 " executable." % constants.NODE_INITD_SCRIPT)
575 def Exec(self, feedback_fn):
576 """Initialize the cluster.
579 clustername = self.clustername
580 hostname = self.hostname
582 # set up the simple store
583 self.sstore = ss = ssconf.SimpleStore()
584 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
590 # set up the inter-node password and certificate
591 _InitGanetiServerSetup(ss)
593 # start the master ip
594 rpc.call_node_start_master(hostname.name)
596 # set up ssh config and /etc/hosts
597 f = open(constants.SSH_HOST_RSA_PUB, 'r')
602 sshkey = sshline.split(" ")[1]
604 _AddHostToEtcHosts(hostname.name)
606 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
608 _InitSSHSetup(hostname.name)
610 # init of cluster config file
611 self.cfg = cfgw = config.ConfigWriter()
612 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613 sshkey, self.op.mac_prefix,
614 self.op.vg_name, self.op.def_bridge)
617 class LUDestroyCluster(NoHooksLU):
618 """Logical unit for destroying the cluster.
623 def CheckPrereq(self):
624 """Check prerequisites.
626 This checks whether the cluster is empty.
628 Any errors are signalled by raising errors.OpPrereqError.
631 master = self.sstore.GetMasterNode()
633 nodelist = self.cfg.GetNodeList()
634 if len(nodelist) != 1 or nodelist[0] != master:
635 raise errors.OpPrereqError("There are still %d node(s) in"
636 " this cluster." % (len(nodelist) - 1))
637 instancelist = self.cfg.GetInstanceList()
639 raise errors.OpPrereqError("There are still %d instance(s) in"
640 " this cluster." % len(instancelist))
642 def Exec(self, feedback_fn):
643 """Destroys the cluster.
646 master = self.sstore.GetMasterNode()
647 if not rpc.call_node_stop_master(master):
648 raise errors.OpExecError("Could not disable the master role")
649 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650 utils.CreateBackup(priv_key)
651 utils.CreateBackup(pub_key)
652 rpc.call_node_leave_cluster(master)
655 class LUVerifyCluster(LogicalUnit):
656 """Verifies the cluster status.
659 HPATH = "cluster-verify"
660 HTYPE = constants.HTYPE_CLUSTER
661 _OP_REQP = ["skip_checks"]
663 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664 remote_version, feedback_fn):
665 """Run multiple tests against a node.
668 - compares ganeti version
669 - checks vg existance and size > 20G
670 - checks config file checksum
671 - checks ssh to other nodes
674 node: name of the node to check
675 file_list: required list of files
676 local_cksum: dictionary of local files and their checksums
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 if not remote_version:
682 feedback_fn(" - ERROR: connection to %s failed" % (node))
685 if local_version != remote_version:
686 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
687 (local_version, node, remote_version))
690 # checks vg existance and size > 20G
694 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
698 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
700 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
703 # checks config file checksum
706 if 'filelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned file checksum data")
710 remote_cksum = node_result['filelist']
711 for file_name in file_list:
712 if file_name not in remote_cksum:
714 feedback_fn(" - ERROR: file '%s' missing" % file_name)
715 elif remote_cksum[file_name] != local_cksum[file_name]:
717 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
719 if 'nodelist' not in node_result:
721 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
723 if node_result['nodelist']:
725 for node in node_result['nodelist']:
726 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
727 (node, node_result['nodelist'][node]))
728 if 'node-net-test' not in node_result:
730 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
732 if node_result['node-net-test']:
734 nlist = utils.NiceSort(node_result['node-net-test'].keys())
736 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
737 (node, node_result['node-net-test'][node]))
739 hyp_result = node_result.get('hypervisor', None)
740 if hyp_result is not None:
741 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
744 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745 node_instance, feedback_fn):
746 """Verify an instance.
748 This function checks to see if the required block devices are
749 available on the instance's node.
754 node_current = instanceconfig.primary_node
757 instanceconfig.MapLVsByNode(node_vol_should)
759 for node in node_vol_should:
760 for volume in node_vol_should[node]:
761 if node not in node_vol_is or volume not in node_vol_is[node]:
762 feedback_fn(" - ERROR: volume %s missing on node %s" %
766 if not instanceconfig.status == 'down':
767 if (node_current not in node_instance or
768 not instance in node_instance[node_current]):
769 feedback_fn(" - ERROR: instance %s not running on node %s" %
770 (instance, node_current))
773 for node in node_instance:
774 if (not node == node_current):
775 if instance in node_instance[node]:
776 feedback_fn(" - ERROR: instance %s should not run on node %s" %
782 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783 """Verify if there are any unknown volumes in the cluster.
785 The .os, .swap and backup volumes are ignored. All other volumes are
791 for node in node_vol_is:
792 for volume in node_vol_is[node]:
793 if node not in node_vol_should or volume not in node_vol_should[node]:
794 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
799 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800 """Verify the list of running instances.
802 This checks what instances are running but unknown to the cluster.
806 for node in node_instance:
807 for runninginstance in node_instance[node]:
808 if runninginstance not in instancelist:
809 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
810 (runninginstance, node))
814 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815 """Verify N+1 Memory Resilience.
817 Check that if one single node dies we can still start all the instances it
823 for node, nodeinfo in node_info.iteritems():
824 # This code checks that every node which is now listed as secondary has
825 # enough memory to host all instances it is supposed to should a single
826 # other node in the cluster fail.
827 # FIXME: not ready for failover to an arbitrary node
828 # FIXME: does not support file-backed instances
829 # WARNING: we currently take into account down instances as well as up
830 # ones, considering that even if they're down someone might want to start
831 # them even in the event of a node failure.
832 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
834 for instance in instances:
835 needed_mem += instance_cfg[instance].memory
836 if nodeinfo['mfree'] < needed_mem:
837 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
838 " failovers should node %s fail" % (node, prinode))
842 def CheckPrereq(self):
843 """Check prerequisites.
845 Transform the list of checks we're going to skip into a set and check that
846 all its members are valid.
849 self.skip_set = frozenset(self.op.skip_checks)
850 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851 raise errors.OpPrereqError("Invalid checks to be skipped specified")
853 def BuildHooksEnv(self):
856 Cluster-Verify hooks just rone in the post phase and their failure makes
857 the output be logged in the verify output and the verification to fail.
860 all_nodes = self.cfg.GetNodeList()
861 tags = self.cfg.GetClusterInfo().GetTags()
862 # TODO: populate the environment with useful information for verify hooks
864 "CLUSTER_TAGS": " ".join(tags),
866 return env, [], all_nodes
868 def Exec(self, feedback_fn):
869 """Verify integrity of cluster, performing various test on nodes.
873 feedback_fn("* Verifying global settings")
874 for msg in self.cfg.VerifyConfig():
875 feedback_fn(" - ERROR: %s" % msg)
877 vg_name = self.cfg.GetVGName()
878 nodelist = utils.NiceSort(self.cfg.GetNodeList())
879 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881 i_non_redundant = [] # Non redundant instances
887 # FIXME: verify OS list
889 file_names = list(self.sstore.GetFileList())
890 file_names.append(constants.SSL_CERT_FILE)
891 file_names.append(constants.CLUSTER_CONF_FILE)
892 local_checksums = utils.FingerprintFiles(file_names)
894 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896 all_instanceinfo = rpc.call_instance_list(nodelist)
897 all_vglist = rpc.call_vg_list(nodelist)
898 node_verify_param = {
899 'filelist': file_names,
900 'nodelist': nodelist,
902 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903 for node in nodeinfo]
905 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906 all_rversion = rpc.call_version(nodelist)
907 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
909 for node in nodelist:
910 feedback_fn("* Verifying node %s" % node)
911 result = self._VerifyNode(node, file_names, local_checksums,
912 all_vglist[node], all_nvinfo[node],
913 all_rversion[node], feedback_fn)
917 volumeinfo = all_volumeinfo[node]
919 if isinstance(volumeinfo, basestring):
920 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
921 (node, volumeinfo[-400:].encode('string_escape')))
923 node_volume[node] = {}
924 elif not isinstance(volumeinfo, dict):
925 feedback_fn(" - ERROR: connection to %s failed" % (node,))
929 node_volume[node] = volumeinfo
932 nodeinstance = all_instanceinfo[node]
933 if type(nodeinstance) != list:
934 feedback_fn(" - ERROR: connection to %s failed" % (node,))
938 node_instance[node] = nodeinstance
941 nodeinfo = all_ninfo[node]
942 if not isinstance(nodeinfo, dict):
943 feedback_fn(" - ERROR: connection to %s failed" % (node,))
949 "mfree": int(nodeinfo['memory_free']),
950 "dfree": int(nodeinfo['vg_free']),
953 # dictionary holding all instances this node is secondary for,
954 # grouped by their primary node. Each key is a cluster node, and each
955 # value is a list of instances which have the key as primary and the
956 # current node as secondary. this is handy to calculate N+1 memory
957 # availability if you can only failover from a primary to its
959 "sinst-by-pnode": {},
962 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
968 for instance in instancelist:
969 feedback_fn("* Verifying instance %s" % instance)
970 inst_config = self.cfg.GetInstanceInfo(instance)
971 result = self._VerifyInstance(instance, inst_config, node_volume,
972 node_instance, feedback_fn)
975 inst_config.MapLVsByNode(node_vol_should)
977 instance_cfg[instance] = inst_config
979 pnode = inst_config.primary_node
980 if pnode in node_info:
981 node_info[pnode]['pinst'].append(instance)
983 feedback_fn(" - ERROR: instance %s, connection to primary node"
984 " %s failed" % (instance, pnode))
987 # If the instance is non-redundant we cannot survive losing its primary
988 # node, so we are not N+1 compliant. On the other hand we have no disk
989 # templates with more than one secondary so that situation is not well
991 # FIXME: does not support file-backed instances
992 if len(inst_config.secondary_nodes) == 0:
993 i_non_redundant.append(instance)
994 elif len(inst_config.secondary_nodes) > 1:
995 feedback_fn(" - WARNING: multiple secondaries for instance %s"
998 for snode in inst_config.secondary_nodes:
999 if snode in node_info:
1000 node_info[snode]['sinst'].append(instance)
1001 if pnode not in node_info[snode]['sinst-by-pnode']:
1002 node_info[snode]['sinst-by-pnode'][pnode] = []
1003 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1005 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1006 " %s failed" % (instance, snode))
1008 feedback_fn("* Verifying orphan volumes")
1009 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1013 feedback_fn("* Verifying remaining instances")
1014 result = self._VerifyOrphanInstances(instancelist, node_instance,
1018 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1019 feedback_fn("* Verifying N+1 Memory redundancy")
1020 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1023 feedback_fn("* Other Notes")
1025 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1026 % len(i_non_redundant))
1030 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1031 """Analize the post-hooks' result, handle it, and send some
1032 nicely-formatted feedback back to the user.
1035 phase: the hooks phase that has just been run
1036 hooks_results: the results of the multi-node hooks rpc call
1037 feedback_fn: function to send feedback back to the caller
1038 lu_result: previous Exec result
1041 # We only really run POST phase hooks, and are only interested in their results
1042 if phase == constants.HOOKS_PHASE_POST:
1043 # Used to change hooks' output to proper indentation
1044 indent_re = re.compile('^', re.M)
1045 feedback_fn("* Hooks Results")
1046 if not hooks_results:
1047 feedback_fn(" - ERROR: general communication failure")
1050 for node_name in hooks_results:
1051 show_node_header = True
1052 res = hooks_results[node_name]
1053 if res is False or not isinstance(res, list):
1054 feedback_fn(" Communication failure")
1057 for script, hkr, output in res:
1058 if hkr == constants.HKR_FAIL:
1059 # The node header is only shown once, if there are
1060 # failing hooks on that node
1061 if show_node_header:
1062 feedback_fn(" Node %s:" % node_name)
1063 show_node_header = False
1064 feedback_fn(" ERROR: Script %s failed, output:" % script)
1065 output = indent_re.sub(' ', output)
1066 feedback_fn("%s" % output)
1072 class LUVerifyDisks(NoHooksLU):
1073 """Verifies the cluster disks status.
1078 def CheckPrereq(self):
1079 """Check prerequisites.
1081 This has no prerequisites.
1086 def Exec(self, feedback_fn):
1087 """Verify integrity of cluster disks.
1090 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1092 vg_name = self.cfg.GetVGName()
1093 nodes = utils.NiceSort(self.cfg.GetNodeList())
1094 instances = [self.cfg.GetInstanceInfo(name)
1095 for name in self.cfg.GetInstanceList()]
1098 for inst in instances:
1100 if (inst.status != "up" or
1101 inst.disk_template not in constants.DTS_NET_MIRROR):
1103 inst.MapLVsByNode(inst_lvs)
1104 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1105 for node, vol_list in inst_lvs.iteritems():
1106 for vol in vol_list:
1107 nv_dict[(node, vol)] = inst
1112 node_lvs = rpc.call_volume_list(nodes, vg_name)
1117 lvs = node_lvs[node]
1119 if isinstance(lvs, basestring):
1120 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1121 res_nlvm[node] = lvs
1122 elif not isinstance(lvs, dict):
1123 logger.Info("connection to node %s failed or invalid data returned" %
1125 res_nodes.append(node)
1128 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1129 inst = nv_dict.pop((node, lv_name), None)
1130 if (not lv_online and inst is not None
1131 and inst.name not in res_instances):
1132 res_instances.append(inst.name)
1134 # any leftover items in nv_dict are missing LVs, let's arrange the
1136 for key, inst in nv_dict.iteritems():
1137 if inst.name not in res_missing:
1138 res_missing[inst.name] = []
1139 res_missing[inst.name].append(key)
1144 class LURenameCluster(LogicalUnit):
1145 """Rename the cluster.
1148 HPATH = "cluster-rename"
1149 HTYPE = constants.HTYPE_CLUSTER
1152 def BuildHooksEnv(self):
1157 "OP_TARGET": self.sstore.GetClusterName(),
1158 "NEW_NAME": self.op.name,
1160 mn = self.sstore.GetMasterNode()
1161 return env, [mn], [mn]
1163 def CheckPrereq(self):
1164 """Verify that the passed name is a valid one.
1167 hostname = utils.HostInfo(self.op.name)
1169 new_name = hostname.name
1170 self.ip = new_ip = hostname.ip
1171 old_name = self.sstore.GetClusterName()
1172 old_ip = self.sstore.GetMasterIP()
1173 if new_name == old_name and new_ip == old_ip:
1174 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1175 " cluster has changed")
1176 if new_ip != old_ip:
1177 result = utils.RunCmd(["fping", "-q", new_ip])
1178 if not result.failed:
1179 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1180 " reachable on the network. Aborting." %
1183 self.op.name = new_name
1185 def Exec(self, feedback_fn):
1186 """Rename the cluster.
1189 clustername = self.op.name
1193 # shutdown the master IP
1194 master = ss.GetMasterNode()
1195 if not rpc.call_node_stop_master(master):
1196 raise errors.OpExecError("Could not disable the master role")
1200 ss.SetKey(ss.SS_MASTER_IP, ip)
1201 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1203 # Distribute updated ss config to all nodes
1204 myself = self.cfg.GetNodeInfo(master)
1205 dist_nodes = self.cfg.GetNodeList()
1206 if myself.name in dist_nodes:
1207 dist_nodes.remove(myself.name)
1209 logger.Debug("Copying updated ssconf data to all nodes")
1210 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1211 fname = ss.KeyToFilename(keyname)
1212 result = rpc.call_upload_file(dist_nodes, fname)
1213 for to_node in dist_nodes:
1214 if not result[to_node]:
1215 logger.Error("copy of file %s to node %s failed" %
1218 if not rpc.call_node_start_master(master):
1219 logger.Error("Could not re-enable the master role on the master,"
1220 " please restart manually.")
1223 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1224 """Sleep and poll for an instance's disk to sync.
1227 if not instance.disks:
1231 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1233 node = instance.primary_node
1235 for dev in instance.disks:
1236 cfgw.SetDiskID(dev, node)
1242 cumul_degraded = False
1243 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1245 proc.LogWarning("Can't get any data from node %s" % node)
1248 raise errors.RemoteError("Can't contact node %s for mirror data,"
1249 " aborting." % node)
1253 for i in range(len(rstats)):
1256 proc.LogWarning("Can't compute data for node %s/%s" %
1257 (node, instance.disks[i].iv_name))
1259 # we ignore the ldisk parameter
1260 perc_done, est_time, is_degraded, _ = mstat
1261 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1262 if perc_done is not None:
1264 if est_time is not None:
1265 rem_time = "%d estimated seconds remaining" % est_time
1268 rem_time = "no time estimate"
1269 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1270 (instance.disks[i].iv_name, perc_done, rem_time))
1277 time.sleep(min(60, max_time))
1283 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1284 return not cumul_degraded
1287 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1288 """Check that mirrors are not degraded.
1290 The ldisk parameter, if True, will change the test from the
1291 is_degraded attribute (which represents overall non-ok status for
1292 the device(s)) to the ldisk (representing the local storage status).
1295 cfgw.SetDiskID(dev, node)
1302 if on_primary or dev.AssembleOnSecondary():
1303 rstats = rpc.call_blockdev_find(node, dev)
1305 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1308 result = result and (not rstats[idx])
1310 for child in dev.children:
1311 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1316 class LUDiagnoseOS(NoHooksLU):
1317 """Logical unit for OS diagnose/query.
1320 _OP_REQP = ["output_fields", "names"]
1322 def CheckPrereq(self):
1323 """Check prerequisites.
1325 This always succeeds, since this is a pure query LU.
1329 raise errors.OpPrereqError("Selective OS query not supported")
1331 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1332 _CheckOutputFields(static=[],
1333 dynamic=self.dynamic_fields,
1334 selected=self.op.output_fields)
1337 def _DiagnoseByOS(node_list, rlist):
1338 """Remaps a per-node return list into an a per-os per-node dictionary
1341 node_list: a list with the names of all nodes
1342 rlist: a map with node names as keys and OS objects as values
1345 map: a map with osnames as keys and as value another map, with
1347 keys and list of OS objects as values
1348 e.g. {"debian-etch": {"node1": [<object>,...],
1349 "node2": [<object>,]}
1354 for node_name, nr in rlist.iteritems():
1358 if os.name not in all_os:
1359 # build a list of nodes for this os containing empty lists
1360 # for each node in node_list
1361 all_os[os.name] = {}
1362 for nname in node_list:
1363 all_os[os.name][nname] = []
1364 all_os[os.name][node_name].append(os)
1367 def Exec(self, feedback_fn):
1368 """Compute the list of OSes.
1371 node_list = self.cfg.GetNodeList()
1372 node_data = rpc.call_os_diagnose(node_list)
1373 if node_data == False:
1374 raise errors.OpExecError("Can't gather the list of OSes")
1375 pol = self._DiagnoseByOS(node_list, node_data)
1377 for os_name, os_data in pol.iteritems():
1379 for field in self.op.output_fields:
1382 elif field == "valid":
1383 val = utils.all([osl and osl[0] for osl in os_data.values()])
1384 elif field == "node_status":
1386 for node_name, nos_list in os_data.iteritems():
1387 val[node_name] = [(v.status, v.path) for v in nos_list]
1389 raise errors.ParameterError(field)
1396 class LURemoveNode(LogicalUnit):
1397 """Logical unit for removing a node.
1400 HPATH = "node-remove"
1401 HTYPE = constants.HTYPE_NODE
1402 _OP_REQP = ["node_name"]
1404 def BuildHooksEnv(self):
1407 This doesn't run on the target node in the pre phase as a failed
1408 node would not allows itself to run.
1412 "OP_TARGET": self.op.node_name,
1413 "NODE_NAME": self.op.node_name,
1415 all_nodes = self.cfg.GetNodeList()
1416 all_nodes.remove(self.op.node_name)
1417 return env, all_nodes, all_nodes
1419 def CheckPrereq(self):
1420 """Check prerequisites.
1423 - the node exists in the configuration
1424 - it does not have primary or secondary instances
1425 - it's not the master
1427 Any errors are signalled by raising errors.OpPrereqError.
1430 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1432 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1434 instance_list = self.cfg.GetInstanceList()
1436 masternode = self.sstore.GetMasterNode()
1437 if node.name == masternode:
1438 raise errors.OpPrereqError("Node is the master node,"
1439 " you need to failover first.")
1441 for instance_name in instance_list:
1442 instance = self.cfg.GetInstanceInfo(instance_name)
1443 if node.name == instance.primary_node:
1444 raise errors.OpPrereqError("Instance %s still running on the node,"
1445 " please remove first." % instance_name)
1446 if node.name in instance.secondary_nodes:
1447 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1448 " please remove first." % instance_name)
1449 self.op.node_name = node.name
1452 def Exec(self, feedback_fn):
1453 """Removes the node from the cluster.
1457 logger.Info("stopping the node daemon and removing configs from node %s" %
1460 rpc.call_node_leave_cluster(node.name)
1462 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1464 logger.Info("Removing node %s from config" % node.name)
1466 self.cfg.RemoveNode(node.name)
1468 _RemoveHostFromEtcHosts(node.name)
1471 class LUQueryNodes(NoHooksLU):
1472 """Logical unit for querying nodes.
1475 _OP_REQP = ["output_fields", "names"]
1477 def CheckPrereq(self):
1478 """Check prerequisites.
1480 This checks that the fields required are valid output fields.
1483 self.dynamic_fields = frozenset([
1485 "mtotal", "mnode", "mfree",
1490 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1491 "pinst_list", "sinst_list",
1493 dynamic=self.dynamic_fields,
1494 selected=self.op.output_fields)
1496 self.wanted = _GetWantedNodes(self, self.op.names)
1498 def Exec(self, feedback_fn):
1499 """Computes the list of nodes and their attributes.
1502 nodenames = self.wanted
1503 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1505 # begin data gathering
1507 if self.dynamic_fields.intersection(self.op.output_fields):
1509 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1510 for name in nodenames:
1511 nodeinfo = node_data.get(name, None)
1514 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1515 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1516 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1517 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1518 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1519 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1520 "bootid": nodeinfo['bootid'],
1523 live_data[name] = {}
1525 live_data = dict.fromkeys(nodenames, {})
1527 node_to_primary = dict([(name, set()) for name in nodenames])
1528 node_to_secondary = dict([(name, set()) for name in nodenames])
1530 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1531 "sinst_cnt", "sinst_list"))
1532 if inst_fields & frozenset(self.op.output_fields):
1533 instancelist = self.cfg.GetInstanceList()
1535 for instance_name in instancelist:
1536 inst = self.cfg.GetInstanceInfo(instance_name)
1537 if inst.primary_node in node_to_primary:
1538 node_to_primary[inst.primary_node].add(inst.name)
1539 for secnode in inst.secondary_nodes:
1540 if secnode in node_to_secondary:
1541 node_to_secondary[secnode].add(inst.name)
1543 # end data gathering
1546 for node in nodelist:
1548 for field in self.op.output_fields:
1551 elif field == "pinst_list":
1552 val = list(node_to_primary[node.name])
1553 elif field == "sinst_list":
1554 val = list(node_to_secondary[node.name])
1555 elif field == "pinst_cnt":
1556 val = len(node_to_primary[node.name])
1557 elif field == "sinst_cnt":
1558 val = len(node_to_secondary[node.name])
1559 elif field == "pip":
1560 val = node.primary_ip
1561 elif field == "sip":
1562 val = node.secondary_ip
1563 elif field in self.dynamic_fields:
1564 val = live_data[node.name].get(field, None)
1566 raise errors.ParameterError(field)
1567 node_output.append(val)
1568 output.append(node_output)
1573 class LUQueryNodeVolumes(NoHooksLU):
1574 """Logical unit for getting volumes on node(s).
1577 _OP_REQP = ["nodes", "output_fields"]
1579 def CheckPrereq(self):
1580 """Check prerequisites.
1582 This checks that the fields required are valid output fields.
1585 self.nodes = _GetWantedNodes(self, self.op.nodes)
1587 _CheckOutputFields(static=["node"],
1588 dynamic=["phys", "vg", "name", "size", "instance"],
1589 selected=self.op.output_fields)
1592 def Exec(self, feedback_fn):
1593 """Computes the list of nodes and their attributes.
1596 nodenames = self.nodes
1597 volumes = rpc.call_node_volumes(nodenames)
1599 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1600 in self.cfg.GetInstanceList()]
1602 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1605 for node in nodenames:
1606 if node not in volumes or not volumes[node]:
1609 node_vols = volumes[node][:]
1610 node_vols.sort(key=lambda vol: vol['dev'])
1612 for vol in node_vols:
1614 for field in self.op.output_fields:
1617 elif field == "phys":
1621 elif field == "name":
1623 elif field == "size":
1624 val = int(float(vol['size']))
1625 elif field == "instance":
1627 if node not in lv_by_node[inst]:
1629 if vol['name'] in lv_by_node[inst][node]:
1635 raise errors.ParameterError(field)
1636 node_output.append(str(val))
1638 output.append(node_output)
1643 class LUAddNode(LogicalUnit):
1644 """Logical unit for adding node to the cluster.
1648 HTYPE = constants.HTYPE_NODE
1649 _OP_REQP = ["node_name"]
1651 def BuildHooksEnv(self):
1654 This will run on all nodes before, and on all nodes + the new node after.
1658 "OP_TARGET": self.op.node_name,
1659 "NODE_NAME": self.op.node_name,
1660 "NODE_PIP": self.op.primary_ip,
1661 "NODE_SIP": self.op.secondary_ip,
1663 nodes_0 = self.cfg.GetNodeList()
1664 nodes_1 = nodes_0 + [self.op.node_name, ]
1665 return env, nodes_0, nodes_1
1667 def CheckPrereq(self):
1668 """Check prerequisites.
1671 - the new node is not already in the config
1673 - its parameters (single/dual homed) matches the cluster
1675 Any errors are signalled by raising errors.OpPrereqError.
1678 node_name = self.op.node_name
1681 dns_data = utils.HostInfo(node_name)
1683 node = dns_data.name
1684 primary_ip = self.op.primary_ip = dns_data.ip
1685 secondary_ip = getattr(self.op, "secondary_ip", None)
1686 if secondary_ip is None:
1687 secondary_ip = primary_ip
1688 if not utils.IsValidIP(secondary_ip):
1689 raise errors.OpPrereqError("Invalid secondary IP given")
1690 self.op.secondary_ip = secondary_ip
1692 node_list = cfg.GetNodeList()
1693 if not self.op.readd and node in node_list:
1694 raise errors.OpPrereqError("Node %s is already in the configuration" %
1696 elif self.op.readd and node not in node_list:
1697 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1699 for existing_node_name in node_list:
1700 existing_node = cfg.GetNodeInfo(existing_node_name)
1702 if self.op.readd and node == existing_node_name:
1703 if (existing_node.primary_ip != primary_ip or
1704 existing_node.secondary_ip != secondary_ip):
1705 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1706 " address configuration as before")
1709 if (existing_node.primary_ip == primary_ip or
1710 existing_node.secondary_ip == primary_ip or
1711 existing_node.primary_ip == secondary_ip or
1712 existing_node.secondary_ip == secondary_ip):
1713 raise errors.OpPrereqError("New node ip address(es) conflict with"
1714 " existing node %s" % existing_node.name)
1716 # check that the type of the node (single versus dual homed) is the
1717 # same as for the master
1718 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1719 master_singlehomed = myself.secondary_ip == myself.primary_ip
1720 newbie_singlehomed = secondary_ip == primary_ip
1721 if master_singlehomed != newbie_singlehomed:
1722 if master_singlehomed:
1723 raise errors.OpPrereqError("The master has no private ip but the"
1724 " new node has one")
1726 raise errors.OpPrereqError("The master has a private ip but the"
1727 " new node doesn't have one")
1729 # checks reachablity
1730 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1731 raise errors.OpPrereqError("Node not reachable by ping")
1733 if not newbie_singlehomed:
1734 # check reachability from my secondary ip to newbie's secondary ip
1735 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1736 source=myself.secondary_ip):
1737 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1738 " based ping to noded port")
1740 self.new_node = objects.Node(name=node,
1741 primary_ip=primary_ip,
1742 secondary_ip=secondary_ip)
1744 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1745 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1746 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1747 constants.VNC_PASSWORD_FILE)
1749 def Exec(self, feedback_fn):
1750 """Adds the new node to the cluster.
1753 new_node = self.new_node
1754 node = new_node.name
1756 # set up inter-node password and certificate and restarts the node daemon
1757 gntpass = self.sstore.GetNodeDaemonPassword()
1758 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1759 raise errors.OpExecError("ganeti password corruption detected")
1760 f = open(constants.SSL_CERT_FILE)
1762 gntpem = f.read(8192)
1765 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1766 # so we use this to detect an invalid certificate; as long as the
1767 # cert doesn't contain this, the here-document will be correctly
1768 # parsed by the shell sequence below
1769 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1770 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1771 if not gntpem.endswith("\n"):
1772 raise errors.OpExecError("PEM must end with newline")
1773 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1775 # and then connect with ssh to set password and start ganeti-noded
1776 # note that all the below variables are sanitized at this point,
1777 # either by being constants or by the checks above
1779 mycommand = ("umask 077 && "
1780 "echo '%s' > '%s' && "
1781 "cat > '%s' << '!EOF.' && \n"
1782 "%s!EOF.\n%s restart" %
1783 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1784 constants.SSL_CERT_FILE, gntpem,
1785 constants.NODE_INITD_SCRIPT))
1787 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1789 raise errors.OpExecError("Remote command on node %s, error: %s,"
1791 (node, result.fail_reason, result.output))
1793 # check connectivity
1796 result = rpc.call_version([node])[node]
1798 if constants.PROTOCOL_VERSION == result:
1799 logger.Info("communication to node %s fine, sw version %s match" %
1802 raise errors.OpExecError("Version mismatch master version %s,"
1803 " node version %s" %
1804 (constants.PROTOCOL_VERSION, result))
1806 raise errors.OpExecError("Cannot get version from the new node")
1809 logger.Info("copy ssh key to node %s" % node)
1810 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1812 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1813 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1819 keyarray.append(f.read())
1823 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1824 keyarray[3], keyarray[4], keyarray[5])
1827 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1829 # Add node to our /etc/hosts, and add key to known_hosts
1830 _AddHostToEtcHosts(new_node.name)
1832 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1833 self.cfg.GetHostKey())
1835 if new_node.secondary_ip != new_node.primary_ip:
1836 if not rpc.call_node_tcp_ping(new_node.name,
1837 constants.LOCALHOST_IP_ADDRESS,
1838 new_node.secondary_ip,
1839 constants.DEFAULT_NODED_PORT,
1841 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1842 " you gave (%s). Please fix and re-run this"
1843 " command." % new_node.secondary_ip)
1845 success, msg = ssh.VerifyNodeHostname(node)
1847 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1848 " than the one the resolver gives: %s."
1849 " Please fix and re-run this command." %
1852 # Distribute updated /etc/hosts and known_hosts to all nodes,
1853 # including the node just added
1854 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1855 dist_nodes = self.cfg.GetNodeList()
1856 if not self.op.readd:
1857 dist_nodes.append(node)
1858 if myself.name in dist_nodes:
1859 dist_nodes.remove(myself.name)
1861 logger.Debug("Copying hosts and known_hosts to all nodes")
1862 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1863 result = rpc.call_upload_file(dist_nodes, fname)
1864 for to_node in dist_nodes:
1865 if not result[to_node]:
1866 logger.Error("copy of file %s to node %s failed" %
1869 to_copy = ss.GetFileList()
1870 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1871 to_copy.append(constants.VNC_PASSWORD_FILE)
1872 for fname in to_copy:
1873 if not ssh.CopyFileToNode(node, fname):
1874 logger.Error("could not copy file %s to node %s" % (fname, node))
1876 if not self.op.readd:
1877 logger.Info("adding node %s to cluster.conf" % node)
1878 self.cfg.AddNode(new_node)
1881 class LUMasterFailover(LogicalUnit):
1882 """Failover the master node to the current node.
1884 This is a special LU in that it must run on a non-master node.
1887 HPATH = "master-failover"
1888 HTYPE = constants.HTYPE_CLUSTER
1892 def BuildHooksEnv(self):
1895 This will run on the new master only in the pre phase, and on all
1896 the nodes in the post phase.
1900 "OP_TARGET": self.new_master,
1901 "NEW_MASTER": self.new_master,
1902 "OLD_MASTER": self.old_master,
1904 return env, [self.new_master], self.cfg.GetNodeList()
1906 def CheckPrereq(self):
1907 """Check prerequisites.
1909 This checks that we are not already the master.
1912 self.new_master = utils.HostInfo().name
1913 self.old_master = self.sstore.GetMasterNode()
1915 if self.old_master == self.new_master:
1916 raise errors.OpPrereqError("This commands must be run on the node"
1917 " where you want the new master to be."
1918 " %s is already the master" %
1921 def Exec(self, feedback_fn):
1922 """Failover the master node.
1924 This command, when run on a non-master node, will cause the current
1925 master to cease being master, and the non-master to become new
1929 #TODO: do not rely on gethostname returning the FQDN
1930 logger.Info("setting master to %s, old master: %s" %
1931 (self.new_master, self.old_master))
1933 if not rpc.call_node_stop_master(self.old_master):
1934 logger.Error("could disable the master role on the old master"
1935 " %s, please disable manually" % self.old_master)
1938 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1939 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1940 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1941 logger.Error("could not distribute the new simple store master file"
1942 " to the other nodes, please check.")
1944 if not rpc.call_node_start_master(self.new_master):
1945 logger.Error("could not start the master role on the new master"
1946 " %s, please check" % self.new_master)
1947 feedback_fn("Error in activating the master IP on the new master,"
1948 " please fix manually.")
1952 class LUQueryClusterInfo(NoHooksLU):
1953 """Query cluster configuration.
1959 def CheckPrereq(self):
1960 """No prerequsites needed for this LU.
1965 def Exec(self, feedback_fn):
1966 """Return cluster config.
1970 "name": self.sstore.GetClusterName(),
1971 "software_version": constants.RELEASE_VERSION,
1972 "protocol_version": constants.PROTOCOL_VERSION,
1973 "config_version": constants.CONFIG_VERSION,
1974 "os_api_version": constants.OS_API_VERSION,
1975 "export_version": constants.EXPORT_VERSION,
1976 "master": self.sstore.GetMasterNode(),
1977 "architecture": (platform.architecture()[0], platform.machine()),
1978 "hypervisor_type": self.sstore.GetHypervisorType(),
1984 class LUClusterCopyFile(NoHooksLU):
1985 """Copy file to cluster.
1988 _OP_REQP = ["nodes", "filename"]
1990 def CheckPrereq(self):
1991 """Check prerequisites.
1993 It should check that the named file exists and that the given list
1997 if not os.path.exists(self.op.filename):
1998 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2000 self.nodes = _GetWantedNodes(self, self.op.nodes)
2002 def Exec(self, feedback_fn):
2003 """Copy a file from master to some nodes.
2006 opts - class with options as members
2007 args - list containing a single element, the file name
2009 nodes - list containing the name of target nodes; if empty, all nodes
2012 filename = self.op.filename
2014 myname = utils.HostInfo().name
2016 for node in self.nodes:
2019 if not ssh.CopyFileToNode(node, filename):
2020 logger.Error("Copy of file %s to node %s failed" % (filename, node))
2023 class LUDumpClusterConfig(NoHooksLU):
2024 """Return a text-representation of the cluster-config.
2029 def CheckPrereq(self):
2030 """No prerequisites.
2035 def Exec(self, feedback_fn):
2036 """Dump a representation of the cluster config to the standard output.
2039 return self.cfg.DumpConfig()
2042 class LURunClusterCommand(NoHooksLU):
2043 """Run a command on some nodes.
2046 _OP_REQP = ["command", "nodes"]
2048 def CheckPrereq(self):
2049 """Check prerequisites.
2051 It checks that the given list of nodes is valid.
2054 self.nodes = _GetWantedNodes(self, self.op.nodes)
2056 def Exec(self, feedback_fn):
2057 """Run a command on some nodes.
2060 # put the master at the end of the nodes list
2061 master_node = self.sstore.GetMasterNode()
2062 if master_node in self.nodes:
2063 self.nodes.remove(master_node)
2064 self.nodes.append(master_node)
2067 for node in self.nodes:
2068 result = ssh.SSHCall(node, "root", self.op.command)
2069 data.append((node, result.output, result.exit_code))
2074 class LUActivateInstanceDisks(NoHooksLU):
2075 """Bring up an instance's disks.
2078 _OP_REQP = ["instance_name"]
2080 def CheckPrereq(self):
2081 """Check prerequisites.
2083 This checks that the instance is in the cluster.
2086 instance = self.cfg.GetInstanceInfo(
2087 self.cfg.ExpandInstanceName(self.op.instance_name))
2088 if instance is None:
2089 raise errors.OpPrereqError("Instance '%s' not known" %
2090 self.op.instance_name)
2091 self.instance = instance
2094 def Exec(self, feedback_fn):
2095 """Activate the disks.
2098 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2100 raise errors.OpExecError("Cannot activate block devices")
2105 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2106 """Prepare the block devices for an instance.
2108 This sets up the block devices on all nodes.
2111 instance: a ganeti.objects.Instance object
2112 ignore_secondaries: if true, errors on secondary nodes won't result
2113 in an error return from the function
2116 false if the operation failed
2117 list of (host, instance_visible_name, node_visible_name) if the operation
2118 suceeded with the mapping from node devices to instance devices
2122 iname = instance.name
2123 # With the two passes mechanism we try to reduce the window of
2124 # opportunity for the race condition of switching DRBD to primary
2125 # before handshaking occured, but we do not eliminate it
2127 # The proper fix would be to wait (with some limits) until the
2128 # connection has been made and drbd transitions from WFConnection
2129 # into any other network-connected state (Connected, SyncTarget,
2132 # 1st pass, assemble on all nodes in secondary mode
2133 for inst_disk in instance.disks:
2134 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2135 cfg.SetDiskID(node_disk, node)
2136 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2138 logger.Error("could not prepare block device %s on node %s"
2139 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2140 if not ignore_secondaries:
2143 # FIXME: race condition on drbd migration to primary
2145 # 2nd pass, do only the primary node
2146 for inst_disk in instance.disks:
2147 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2148 if node != instance.primary_node:
2150 cfg.SetDiskID(node_disk, node)
2151 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2153 logger.Error("could not prepare block device %s on node %s"
2154 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2156 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2158 # leave the disks configured for the primary node
2159 # this is a workaround that would be fixed better by
2160 # improving the logical/physical id handling
2161 for disk in instance.disks:
2162 cfg.SetDiskID(disk, instance.primary_node)
2164 return disks_ok, device_info
2167 def _StartInstanceDisks(cfg, instance, force):
2168 """Start the disks of an instance.
2171 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2172 ignore_secondaries=force)
2174 _ShutdownInstanceDisks(instance, cfg)
2175 if force is not None and not force:
2176 logger.Error("If the message above refers to a secondary node,"
2177 " you can retry the operation using '--force'.")
2178 raise errors.OpExecError("Disk consistency error")
2181 class LUDeactivateInstanceDisks(NoHooksLU):
2182 """Shutdown an instance's disks.
2185 _OP_REQP = ["instance_name"]
2187 def CheckPrereq(self):
2188 """Check prerequisites.
2190 This checks that the instance is in the cluster.
2193 instance = self.cfg.GetInstanceInfo(
2194 self.cfg.ExpandInstanceName(self.op.instance_name))
2195 if instance is None:
2196 raise errors.OpPrereqError("Instance '%s' not known" %
2197 self.op.instance_name)
2198 self.instance = instance
2200 def Exec(self, feedback_fn):
2201 """Deactivate the disks
2204 instance = self.instance
2205 ins_l = rpc.call_instance_list([instance.primary_node])
2206 ins_l = ins_l[instance.primary_node]
2207 if not type(ins_l) is list:
2208 raise errors.OpExecError("Can't contact node '%s'" %
2209 instance.primary_node)
2211 if self.instance.name in ins_l:
2212 raise errors.OpExecError("Instance is running, can't shutdown"
2215 _ShutdownInstanceDisks(instance, self.cfg)
2218 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2219 """Shutdown block devices of an instance.
2221 This does the shutdown on all nodes of the instance.
2223 If the ignore_primary is false, errors on the primary node are
2228 for disk in instance.disks:
2229 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2230 cfg.SetDiskID(top_disk, node)
2231 if not rpc.call_blockdev_shutdown(node, top_disk):
2232 logger.Error("could not shutdown block device %s on node %s" %
2233 (disk.iv_name, node))
2234 if not ignore_primary or node != instance.primary_node:
2239 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2240 """Checks if a node has enough free memory.
2242 This function check if a given node has the needed amount of free
2243 memory. In case the node has less memory or we cannot get the
2244 information from the node, this function raise an OpPrereqError
2248 - cfg: a ConfigWriter instance
2249 - node: the node name
2250 - reason: string to use in the error message
2251 - requested: the amount of memory in MiB
2254 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2255 if not nodeinfo or not isinstance(nodeinfo, dict):
2256 raise errors.OpPrereqError("Could not contact node %s for resource"
2257 " information" % (node,))
2259 free_mem = nodeinfo[node].get('memory_free')
2260 if not isinstance(free_mem, int):
2261 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2262 " was '%s'" % (node, free_mem))
2263 if requested > free_mem:
2264 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2265 " needed %s MiB, available %s MiB" %
2266 (node, reason, requested, free_mem))
2269 class LUStartupInstance(LogicalUnit):
2270 """Starts an instance.
2273 HPATH = "instance-start"
2274 HTYPE = constants.HTYPE_INSTANCE
2275 _OP_REQP = ["instance_name", "force"]
2277 def BuildHooksEnv(self):
2280 This runs on master, primary and secondary nodes of the instance.
2284 "FORCE": self.op.force,
2286 env.update(_BuildInstanceHookEnvByObject(self.instance))
2287 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2288 list(self.instance.secondary_nodes))
2291 def CheckPrereq(self):
2292 """Check prerequisites.
2294 This checks that the instance is in the cluster.
2297 instance = self.cfg.GetInstanceInfo(
2298 self.cfg.ExpandInstanceName(self.op.instance_name))
2299 if instance is None:
2300 raise errors.OpPrereqError("Instance '%s' not known" %
2301 self.op.instance_name)
2303 # check bridges existance
2304 _CheckInstanceBridgesExist(instance)
2306 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2307 "starting instance %s" % instance.name,
2310 self.instance = instance
2311 self.op.instance_name = instance.name
2313 def Exec(self, feedback_fn):
2314 """Start the instance.
2317 instance = self.instance
2318 force = self.op.force
2319 extra_args = getattr(self.op, "extra_args", "")
2321 self.cfg.MarkInstanceUp(instance.name)
2323 node_current = instance.primary_node
2325 _StartInstanceDisks(self.cfg, instance, force)
2327 if not rpc.call_instance_start(node_current, instance, extra_args):
2328 _ShutdownInstanceDisks(instance, self.cfg)
2329 raise errors.OpExecError("Could not start instance")
2332 class LURebootInstance(LogicalUnit):
2333 """Reboot an instance.
2336 HPATH = "instance-reboot"
2337 HTYPE = constants.HTYPE_INSTANCE
2338 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2340 def BuildHooksEnv(self):
2343 This runs on master, primary and secondary nodes of the instance.
2347 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2349 env.update(_BuildInstanceHookEnvByObject(self.instance))
2350 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2351 list(self.instance.secondary_nodes))
2354 def CheckPrereq(self):
2355 """Check prerequisites.
2357 This checks that the instance is in the cluster.
2360 instance = self.cfg.GetInstanceInfo(
2361 self.cfg.ExpandInstanceName(self.op.instance_name))
2362 if instance is None:
2363 raise errors.OpPrereqError("Instance '%s' not known" %
2364 self.op.instance_name)
2366 # check bridges existance
2367 _CheckInstanceBridgesExist(instance)
2369 self.instance = instance
2370 self.op.instance_name = instance.name
2372 def Exec(self, feedback_fn):
2373 """Reboot the instance.
2376 instance = self.instance
2377 ignore_secondaries = self.op.ignore_secondaries
2378 reboot_type = self.op.reboot_type
2379 extra_args = getattr(self.op, "extra_args", "")
2381 node_current = instance.primary_node
2383 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2384 constants.INSTANCE_REBOOT_HARD,
2385 constants.INSTANCE_REBOOT_FULL]:
2386 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2387 (constants.INSTANCE_REBOOT_SOFT,
2388 constants.INSTANCE_REBOOT_HARD,
2389 constants.INSTANCE_REBOOT_FULL))
2391 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2392 constants.INSTANCE_REBOOT_HARD]:
2393 if not rpc.call_instance_reboot(node_current, instance,
2394 reboot_type, extra_args):
2395 raise errors.OpExecError("Could not reboot instance")
2397 if not rpc.call_instance_shutdown(node_current, instance):
2398 raise errors.OpExecError("could not shutdown instance for full reboot")
2399 _ShutdownInstanceDisks(instance, self.cfg)
2400 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2401 if not rpc.call_instance_start(node_current, instance, extra_args):
2402 _ShutdownInstanceDisks(instance, self.cfg)
2403 raise errors.OpExecError("Could not start instance for full reboot")
2405 self.cfg.MarkInstanceUp(instance.name)
2408 class LUShutdownInstance(LogicalUnit):
2409 """Shutdown an instance.
2412 HPATH = "instance-stop"
2413 HTYPE = constants.HTYPE_INSTANCE
2414 _OP_REQP = ["instance_name"]
2416 def BuildHooksEnv(self):
2419 This runs on master, primary and secondary nodes of the instance.
2422 env = _BuildInstanceHookEnvByObject(self.instance)
2423 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2424 list(self.instance.secondary_nodes))
2427 def CheckPrereq(self):
2428 """Check prerequisites.
2430 This checks that the instance is in the cluster.
2433 instance = self.cfg.GetInstanceInfo(
2434 self.cfg.ExpandInstanceName(self.op.instance_name))
2435 if instance is None:
2436 raise errors.OpPrereqError("Instance '%s' not known" %
2437 self.op.instance_name)
2438 self.instance = instance
2440 def Exec(self, feedback_fn):
2441 """Shutdown the instance.
2444 instance = self.instance
2445 node_current = instance.primary_node
2446 self.cfg.MarkInstanceDown(instance.name)
2447 if not rpc.call_instance_shutdown(node_current, instance):
2448 logger.Error("could not shutdown instance")
2450 _ShutdownInstanceDisks(instance, self.cfg)
2453 class LUReinstallInstance(LogicalUnit):
2454 """Reinstall an instance.
2457 HPATH = "instance-reinstall"
2458 HTYPE = constants.HTYPE_INSTANCE
2459 _OP_REQP = ["instance_name"]
2461 def BuildHooksEnv(self):
2464 This runs on master, primary and secondary nodes of the instance.
2467 env = _BuildInstanceHookEnvByObject(self.instance)
2468 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2469 list(self.instance.secondary_nodes))
2472 def CheckPrereq(self):
2473 """Check prerequisites.
2475 This checks that the instance is in the cluster and is not running.
2478 instance = self.cfg.GetInstanceInfo(
2479 self.cfg.ExpandInstanceName(self.op.instance_name))
2480 if instance is None:
2481 raise errors.OpPrereqError("Instance '%s' not known" %
2482 self.op.instance_name)
2483 if instance.disk_template == constants.DT_DISKLESS:
2484 raise errors.OpPrereqError("Instance '%s' has no disks" %
2485 self.op.instance_name)
2486 if instance.status != "down":
2487 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2488 self.op.instance_name)
2489 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2491 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2492 (self.op.instance_name,
2493 instance.primary_node))
2495 self.op.os_type = getattr(self.op, "os_type", None)
2496 if self.op.os_type is not None:
2498 pnode = self.cfg.GetNodeInfo(
2499 self.cfg.ExpandNodeName(instance.primary_node))
2501 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2503 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2505 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2506 " primary node" % self.op.os_type)
2508 self.instance = instance
2510 def Exec(self, feedback_fn):
2511 """Reinstall the instance.
2514 inst = self.instance
2516 if self.op.os_type is not None:
2517 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2518 inst.os = self.op.os_type
2519 self.cfg.AddInstance(inst)
2521 _StartInstanceDisks(self.cfg, inst, None)
2523 feedback_fn("Running the instance OS create scripts...")
2524 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2525 raise errors.OpExecError("Could not install OS for instance %s"
2527 (inst.name, inst.primary_node))
2529 _ShutdownInstanceDisks(inst, self.cfg)
2532 class LURenameInstance(LogicalUnit):
2533 """Rename an instance.
2536 HPATH = "instance-rename"
2537 HTYPE = constants.HTYPE_INSTANCE
2538 _OP_REQP = ["instance_name", "new_name"]
2540 def BuildHooksEnv(self):
2543 This runs on master, primary and secondary nodes of the instance.
2546 env = _BuildInstanceHookEnvByObject(self.instance)
2547 env["INSTANCE_NEW_NAME"] = self.op.new_name
2548 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2549 list(self.instance.secondary_nodes))
2552 def CheckPrereq(self):
2553 """Check prerequisites.
2555 This checks that the instance is in the cluster and is not running.
2558 instance = self.cfg.GetInstanceInfo(
2559 self.cfg.ExpandInstanceName(self.op.instance_name))
2560 if instance is None:
2561 raise errors.OpPrereqError("Instance '%s' not known" %
2562 self.op.instance_name)
2563 if instance.status != "down":
2564 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2565 self.op.instance_name)
2566 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2568 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2569 (self.op.instance_name,
2570 instance.primary_node))
2571 self.instance = instance
2573 # new name verification
2574 name_info = utils.HostInfo(self.op.new_name)
2576 self.op.new_name = new_name = name_info.name
2577 instance_list = self.cfg.GetInstanceList()
2578 if new_name in instance_list:
2579 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2582 if not getattr(self.op, "ignore_ip", False):
2583 command = ["fping", "-q", name_info.ip]
2584 result = utils.RunCmd(command)
2585 if not result.failed:
2586 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2587 (name_info.ip, new_name))
2590 def Exec(self, feedback_fn):
2591 """Reinstall the instance.
2594 inst = self.instance
2595 old_name = inst.name
2597 self.cfg.RenameInstance(inst.name, self.op.new_name)
2599 # re-read the instance from the configuration after rename
2600 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2602 _StartInstanceDisks(self.cfg, inst, None)
2604 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2606 msg = ("Could run OS rename script for instance %s on node %s (but the"
2607 " instance has been renamed in Ganeti)" %
2608 (inst.name, inst.primary_node))
2611 _ShutdownInstanceDisks(inst, self.cfg)
2614 class LURemoveInstance(LogicalUnit):
2615 """Remove an instance.
2618 HPATH = "instance-remove"
2619 HTYPE = constants.HTYPE_INSTANCE
2620 _OP_REQP = ["instance_name", "ignore_failures"]
2622 def BuildHooksEnv(self):
2625 This runs on master, primary and secondary nodes of the instance.
2628 env = _BuildInstanceHookEnvByObject(self.instance)
2629 nl = [self.sstore.GetMasterNode()]
2632 def CheckPrereq(self):
2633 """Check prerequisites.
2635 This checks that the instance is in the cluster.
2638 instance = self.cfg.GetInstanceInfo(
2639 self.cfg.ExpandInstanceName(self.op.instance_name))
2640 if instance is None:
2641 raise errors.OpPrereqError("Instance '%s' not known" %
2642 self.op.instance_name)
2643 self.instance = instance
2645 def Exec(self, feedback_fn):
2646 """Remove the instance.
2649 instance = self.instance
2650 logger.Info("shutting down instance %s on node %s" %
2651 (instance.name, instance.primary_node))
2653 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2654 if self.op.ignore_failures:
2655 feedback_fn("Warning: can't shutdown instance")
2657 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2658 (instance.name, instance.primary_node))
2660 logger.Info("removing block devices for instance %s" % instance.name)
2662 if not _RemoveDisks(instance, self.cfg):
2663 if self.op.ignore_failures:
2664 feedback_fn("Warning: can't remove instance's disks")
2666 raise errors.OpExecError("Can't remove instance's disks")
2668 logger.Info("removing instance %s out of cluster config" % instance.name)
2670 self.cfg.RemoveInstance(instance.name)
2673 class LUQueryInstances(NoHooksLU):
2674 """Logical unit for querying instances.
2677 _OP_REQP = ["output_fields", "names"]
2679 def CheckPrereq(self):
2680 """Check prerequisites.
2682 This checks that the fields required are valid output fields.
2685 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2686 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2687 "admin_state", "admin_ram",
2688 "disk_template", "ip", "mac", "bridge",
2689 "sda_size", "sdb_size", "vcpus"],
2690 dynamic=self.dynamic_fields,
2691 selected=self.op.output_fields)
2693 self.wanted = _GetWantedInstances(self, self.op.names)
2695 def Exec(self, feedback_fn):
2696 """Computes the list of nodes and their attributes.
2699 instance_names = self.wanted
2700 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2703 # begin data gathering
2705 nodes = frozenset([inst.primary_node for inst in instance_list])
2708 if self.dynamic_fields.intersection(self.op.output_fields):
2710 node_data = rpc.call_all_instances_info(nodes)
2712 result = node_data[name]
2714 live_data.update(result)
2715 elif result == False:
2716 bad_nodes.append(name)
2717 # else no instance is alive
2719 live_data = dict([(name, {}) for name in instance_names])
2721 # end data gathering
2724 for instance in instance_list:
2726 for field in self.op.output_fields:
2731 elif field == "pnode":
2732 val = instance.primary_node
2733 elif field == "snodes":
2734 val = list(instance.secondary_nodes)
2735 elif field == "admin_state":
2736 val = (instance.status != "down")
2737 elif field == "oper_state":
2738 if instance.primary_node in bad_nodes:
2741 val = bool(live_data.get(instance.name))
2742 elif field == "status":
2743 if instance.primary_node in bad_nodes:
2744 val = "ERROR_nodedown"
2746 running = bool(live_data.get(instance.name))
2748 if instance.status != "down":
2753 if instance.status != "down":
2757 elif field == "admin_ram":
2758 val = instance.memory
2759 elif field == "oper_ram":
2760 if instance.primary_node in bad_nodes:
2762 elif instance.name in live_data:
2763 val = live_data[instance.name].get("memory", "?")
2766 elif field == "disk_template":
2767 val = instance.disk_template
2769 val = instance.nics[0].ip
2770 elif field == "bridge":
2771 val = instance.nics[0].bridge
2772 elif field == "mac":
2773 val = instance.nics[0].mac
2774 elif field == "sda_size" or field == "sdb_size":
2775 disk = instance.FindDisk(field[:3])
2780 elif field == "vcpus":
2781 val = instance.vcpus
2783 raise errors.ParameterError(field)
2790 class LUFailoverInstance(LogicalUnit):
2791 """Failover an instance.
2794 HPATH = "instance-failover"
2795 HTYPE = constants.HTYPE_INSTANCE
2796 _OP_REQP = ["instance_name", "ignore_consistency"]
2798 def BuildHooksEnv(self):
2801 This runs on master, primary and secondary nodes of the instance.
2805 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2807 env.update(_BuildInstanceHookEnvByObject(self.instance))
2808 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2811 def CheckPrereq(self):
2812 """Check prerequisites.
2814 This checks that the instance is in the cluster.
2817 instance = self.cfg.GetInstanceInfo(
2818 self.cfg.ExpandInstanceName(self.op.instance_name))
2819 if instance is None:
2820 raise errors.OpPrereqError("Instance '%s' not known" %
2821 self.op.instance_name)
2823 if instance.disk_template not in constants.DTS_NET_MIRROR:
2824 raise errors.OpPrereqError("Instance's disk layout is not"
2825 " network mirrored, cannot failover.")
2827 secondary_nodes = instance.secondary_nodes
2828 if not secondary_nodes:
2829 raise errors.ProgrammerError("no secondary node but using "
2830 "DT_REMOTE_RAID1 template")
2832 target_node = secondary_nodes[0]
2833 # check memory requirements on the secondary node
2834 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2835 instance.name, instance.memory)
2837 # check bridge existance
2838 brlist = [nic.bridge for nic in instance.nics]
2839 if not rpc.call_bridges_exist(target_node, brlist):
2840 raise errors.OpPrereqError("One or more target bridges %s does not"
2841 " exist on destination node '%s'" %
2842 (brlist, target_node))
2844 self.instance = instance
2846 def Exec(self, feedback_fn):
2847 """Failover an instance.
2849 The failover is done by shutting it down on its present node and
2850 starting it on the secondary.
2853 instance = self.instance
2855 source_node = instance.primary_node
2856 target_node = instance.secondary_nodes[0]
2858 feedback_fn("* checking disk consistency between source and target")
2859 for dev in instance.disks:
2860 # for remote_raid1, these are md over drbd
2861 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2862 if instance.status == "up" and not self.op.ignore_consistency:
2863 raise errors.OpExecError("Disk %s is degraded on target node,"
2864 " aborting failover." % dev.iv_name)
2866 feedback_fn("* shutting down instance on source node")
2867 logger.Info("Shutting down instance %s on node %s" %
2868 (instance.name, source_node))
2870 if not rpc.call_instance_shutdown(source_node, instance):
2871 if self.op.ignore_consistency:
2872 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2873 " anyway. Please make sure node %s is down" %
2874 (instance.name, source_node, source_node))
2876 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2877 (instance.name, source_node))
2879 feedback_fn("* deactivating the instance's disks on source node")
2880 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2881 raise errors.OpExecError("Can't shut down the instance's disks.")
2883 instance.primary_node = target_node
2884 # distribute new instance config to the other nodes
2885 self.cfg.Update(instance)
2887 # Only start the instance if it's marked as up
2888 if instance.status == "up":
2889 feedback_fn("* activating the instance's disks on target node")
2890 logger.Info("Starting instance %s on node %s" %
2891 (instance.name, target_node))
2893 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2894 ignore_secondaries=True)
2896 _ShutdownInstanceDisks(instance, self.cfg)
2897 raise errors.OpExecError("Can't activate the instance's disks")
2899 feedback_fn("* starting the instance on the target node")
2900 if not rpc.call_instance_start(target_node, instance, None):
2901 _ShutdownInstanceDisks(instance, self.cfg)
2902 raise errors.OpExecError("Could not start instance %s on node %s." %
2903 (instance.name, target_node))
2906 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2907 """Create a tree of block devices on the primary node.
2909 This always creates all devices.
2913 for child in device.children:
2914 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2917 cfg.SetDiskID(device, node)
2918 new_id = rpc.call_blockdev_create(node, device, device.size,
2919 instance.name, True, info)
2922 if device.physical_id is None:
2923 device.physical_id = new_id
2927 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2928 """Create a tree of block devices on a secondary node.
2930 If this device type has to be created on secondaries, create it and
2933 If not, just recurse to children keeping the same 'force' value.
2936 if device.CreateOnSecondary():
2939 for child in device.children:
2940 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2941 child, force, info):
2946 cfg.SetDiskID(device, node)
2947 new_id = rpc.call_blockdev_create(node, device, device.size,
2948 instance.name, False, info)
2951 if device.physical_id is None:
2952 device.physical_id = new_id
2956 def _GenerateUniqueNames(cfg, exts):
2957 """Generate a suitable LV name.
2959 This will generate a logical volume name for the given instance.
2964 new_id = cfg.GenerateUniqueID()
2965 results.append("%s%s" % (new_id, val))
2969 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2970 """Generate a drbd device complete with its children.
2973 port = cfg.AllocatePort()
2974 vgname = cfg.GetVGName()
2975 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2976 logical_id=(vgname, names[0]))
2977 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2978 logical_id=(vgname, names[1]))
2979 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2980 logical_id = (primary, secondary, port),
2981 children = [dev_data, dev_meta])
2985 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2986 """Generate a drbd8 device complete with its children.
2989 port = cfg.AllocatePort()
2990 vgname = cfg.GetVGName()
2991 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2992 logical_id=(vgname, names[0]))
2993 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2994 logical_id=(vgname, names[1]))
2995 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2996 logical_id = (primary, secondary, port),
2997 children = [dev_data, dev_meta],
3001 def _GenerateDiskTemplate(cfg, template_name,
3002 instance_name, primary_node,
3003 secondary_nodes, disk_sz, swap_sz):
3004 """Generate the entire disk layout for a given template type.
3007 #TODO: compute space requirements
3009 vgname = cfg.GetVGName()
3010 if template_name == constants.DT_DISKLESS:
3012 elif template_name == constants.DT_PLAIN:
3013 if len(secondary_nodes) != 0:
3014 raise errors.ProgrammerError("Wrong template configuration")
3016 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3017 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3018 logical_id=(vgname, names[0]),
3020 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3021 logical_id=(vgname, names[1]),
3023 disks = [sda_dev, sdb_dev]
3024 elif template_name == constants.DT_LOCAL_RAID1:
3025 if len(secondary_nodes) != 0:
3026 raise errors.ProgrammerError("Wrong template configuration")
3029 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3030 ".sdb_m1", ".sdb_m2"])
3031 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3032 logical_id=(vgname, names[0]))
3033 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3034 logical_id=(vgname, names[1]))
3035 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3037 children = [sda_dev_m1, sda_dev_m2])
3038 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3039 logical_id=(vgname, names[2]))
3040 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3041 logical_id=(vgname, names[3]))
3042 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3044 children = [sdb_dev_m1, sdb_dev_m2])
3045 disks = [md_sda_dev, md_sdb_dev]
3046 elif template_name == constants.DT_REMOTE_RAID1:
3047 if len(secondary_nodes) != 1:
3048 raise errors.ProgrammerError("Wrong template configuration")
3049 remote_node = secondary_nodes[0]
3050 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3051 ".sdb_data", ".sdb_meta"])
3052 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3053 disk_sz, names[0:2])
3054 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3055 children = [drbd_sda_dev], size=disk_sz)
3056 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3057 swap_sz, names[2:4])
3058 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3059 children = [drbd_sdb_dev], size=swap_sz)
3060 disks = [md_sda_dev, md_sdb_dev]
3061 elif template_name == constants.DT_DRBD8:
3062 if len(secondary_nodes) != 1:
3063 raise errors.ProgrammerError("Wrong template configuration")
3064 remote_node = secondary_nodes[0]
3065 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3066 ".sdb_data", ".sdb_meta"])
3067 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3068 disk_sz, names[0:2], "sda")
3069 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3070 swap_sz, names[2:4], "sdb")
3071 disks = [drbd_sda_dev, drbd_sdb_dev]
3073 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3077 def _GetInstanceInfoText(instance):
3078 """Compute that text that should be added to the disk's metadata.
3081 return "originstname+%s" % instance.name
3084 def _CreateDisks(cfg, instance):
3085 """Create all disks for an instance.
3087 This abstracts away some work from AddInstance.
3090 instance: the instance object
3093 True or False showing the success of the creation process
3096 info = _GetInstanceInfoText(instance)
3098 for device in instance.disks:
3099 logger.Info("creating volume %s for instance %s" %
3100 (device.iv_name, instance.name))
3102 for secondary_node in instance.secondary_nodes:
3103 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3104 device, False, info):
3105 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3106 (device.iv_name, device, secondary_node))
3109 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3110 instance, device, info):
3111 logger.Error("failed to create volume %s on primary!" %
3117 def _RemoveDisks(instance, cfg):
3118 """Remove all disks for an instance.
3120 This abstracts away some work from `AddInstance()` and
3121 `RemoveInstance()`. Note that in case some of the devices couldn't
3122 be removed, the removal will continue with the other ones (compare
3123 with `_CreateDisks()`).
3126 instance: the instance object
3129 True or False showing the success of the removal proces
3132 logger.Info("removing block devices for instance %s" % instance.name)
3135 for device in instance.disks:
3136 for node, disk in device.ComputeNodeTree(instance.primary_node):
3137 cfg.SetDiskID(disk, node)
3138 if not rpc.call_blockdev_remove(node, disk):
3139 logger.Error("could not remove block device %s on node %s,"
3140 " continuing anyway" %
3141 (device.iv_name, node))
3146 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3147 """Compute disk size requirements in the volume group
3149 This is currently hard-coded for the two-drive layout.
3152 # Required free disk space as a function of disk and swap space
3154 constants.DT_DISKLESS: None,
3155 constants.DT_PLAIN: disk_size + swap_size,
3156 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3157 # 256 MB are added for drbd metadata, 128MB for each drbd device
3158 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3159 constants.DT_DRBD8: disk_size + swap_size + 256,
3162 if disk_template not in req_size_dict:
3163 raise errors.ProgrammerError("Disk template '%s' size requirement"
3164 " is unknown" % disk_template)
3166 return req_size_dict[disk_template]
3169 class LUCreateInstance(LogicalUnit):
3170 """Create an instance.
3173 HPATH = "instance-add"
3174 HTYPE = constants.HTYPE_INSTANCE
3175 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3176 "disk_template", "swap_size", "mode", "start", "vcpus",
3177 "wait_for_sync", "ip_check", "mac"]
3179 def _RunAllocator(self):
3180 """Run the allocator based on input opcode.
3183 disks = [{"size": self.op.disk_size, "mode": "w"},
3184 {"size": self.op.swap_size, "mode": "w"}]
3185 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3186 "bridge": self.op.bridge}]
3187 ial = IAllocator(self.cfg, self.sstore,
3188 mode=constants.IALLOCATOR_MODE_ALLOC,
3189 name=self.op.instance_name,
3190 disk_template=self.op.disk_template,
3193 vcpus=self.op.vcpus,
3194 mem_size=self.op.mem_size,
3199 ial.Run(self.op.iallocator)
3202 raise errors.OpPrereqError("Can't compute nodes using"
3203 " iallocator '%s': %s" % (self.op.iallocator,
3205 if len(ial.nodes) != ial.required_nodes:
3206 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3207 " of nodes (%s), required %s" %
3208 (len(ial.nodes), ial.required_nodes))
3209 self.op.pnode = ial.nodes[0]
3210 logger.ToStdout("Selected nodes for the instance: %s" %
3211 (", ".join(ial.nodes),))
3212 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3213 (self.op.instance_name, self.op.iallocator, ial.nodes))
3214 if ial.required_nodes == 2:
3215 self.op.snode = ial.nodes[1]
3217 def BuildHooksEnv(self):
3220 This runs on master, primary and secondary nodes of the instance.
3224 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3225 "INSTANCE_DISK_SIZE": self.op.disk_size,
3226 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3227 "INSTANCE_ADD_MODE": self.op.mode,
3229 if self.op.mode == constants.INSTANCE_IMPORT:
3230 env["INSTANCE_SRC_NODE"] = self.op.src_node
3231 env["INSTANCE_SRC_PATH"] = self.op.src_path
3232 env["INSTANCE_SRC_IMAGE"] = self.src_image
3234 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3235 primary_node=self.op.pnode,
3236 secondary_nodes=self.secondaries,
3237 status=self.instance_status,
3238 os_type=self.op.os_type,
3239 memory=self.op.mem_size,
3240 vcpus=self.op.vcpus,
3241 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3244 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3249 def CheckPrereq(self):
3250 """Check prerequisites.
3253 # set optional parameters to none if they don't exist
3254 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3255 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3256 "vnc_bind_address"]:
3257 if not hasattr(self.op, attr):
3258 setattr(self.op, attr, None)
3260 if self.op.mode not in (constants.INSTANCE_CREATE,
3261 constants.INSTANCE_IMPORT):
3262 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3265 if self.op.mode == constants.INSTANCE_IMPORT:
3266 src_node = getattr(self.op, "src_node", None)
3267 src_path = getattr(self.op, "src_path", None)
3268 if src_node is None or src_path is None:
3269 raise errors.OpPrereqError("Importing an instance requires source"
3270 " node and path options")
3271 src_node_full = self.cfg.ExpandNodeName(src_node)
3272 if src_node_full is None:
3273 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3274 self.op.src_node = src_node = src_node_full
3276 if not os.path.isabs(src_path):
3277 raise errors.OpPrereqError("The source path must be absolute")
3279 export_info = rpc.call_export_info(src_node, src_path)
3282 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3284 if not export_info.has_section(constants.INISECT_EXP):
3285 raise errors.ProgrammerError("Corrupted export config")
3287 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3288 if (int(ei_version) != constants.EXPORT_VERSION):
3289 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3290 (ei_version, constants.EXPORT_VERSION))
3292 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3293 raise errors.OpPrereqError("Can't import instance with more than"
3296 # FIXME: are the old os-es, disk sizes, etc. useful?
3297 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3298 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3300 self.src_image = diskimage
3301 else: # INSTANCE_CREATE
3302 if getattr(self.op, "os_type", None) is None:
3303 raise errors.OpPrereqError("No guest OS specified")
3305 #### instance parameters check
3307 # disk template and mirror node verification
3308 if self.op.disk_template not in constants.DISK_TEMPLATES:
3309 raise errors.OpPrereqError("Invalid disk template name")
3311 # instance name verification
3312 hostname1 = utils.HostInfo(self.op.instance_name)
3314 self.op.instance_name = instance_name = hostname1.name
3315 instance_list = self.cfg.GetInstanceList()
3316 if instance_name in instance_list:
3317 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3320 # ip validity checks
3321 ip = getattr(self.op, "ip", None)
3322 if ip is None or ip.lower() == "none":
3324 elif ip.lower() == "auto":
3325 inst_ip = hostname1.ip
3327 if not utils.IsValidIP(ip):
3328 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3329 " like a valid IP" % ip)
3331 self.inst_ip = self.op.ip = inst_ip
3333 if self.op.start and not self.op.ip_check:
3334 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3335 " adding an instance in start mode")
3337 if self.op.ip_check:
3338 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3339 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3340 (hostname1.ip, instance_name))
3342 # MAC address verification
3343 if self.op.mac != "auto":
3344 if not utils.IsValidMac(self.op.mac.lower()):
3345 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3348 # bridge verification
3349 bridge = getattr(self.op, "bridge", None)
3351 self.op.bridge = self.cfg.GetDefBridge()
3353 self.op.bridge = bridge
3355 # boot order verification
3356 if self.op.hvm_boot_order is not None:
3357 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3358 raise errors.OpPrereqError("invalid boot order specified,"
3359 " must be one or more of [acdn]")
3362 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3363 raise errors.OpPrereqError("One and only one of iallocator and primary"
3364 " node must be given")
3366 if self.op.iallocator is not None:
3367 self._RunAllocator()
3369 #### node related checks
3371 # check primary node
3372 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3374 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3376 self.op.pnode = pnode.name
3378 self.secondaries = []
3380 # mirror node verification
3381 if self.op.disk_template in constants.DTS_NET_MIRROR:
3382 if getattr(self.op, "snode", None) is None:
3383 raise errors.OpPrereqError("The networked disk templates need"
3386 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3387 if snode_name is None:
3388 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3390 elif snode_name == pnode.name:
3391 raise errors.OpPrereqError("The secondary node cannot be"
3392 " the primary node.")
3393 self.secondaries.append(snode_name)
3395 req_size = _ComputeDiskSize(self.op.disk_template,
3396 self.op.disk_size, self.op.swap_size)
3398 # Check lv size requirements
3399 if req_size is not None:
3400 nodenames = [pnode.name] + self.secondaries
3401 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3402 for node in nodenames:
3403 info = nodeinfo.get(node, None)
3405 raise errors.OpPrereqError("Cannot get current information"
3406 " from node '%s'" % node)
3407 vg_free = info.get('vg_free', None)
3408 if not isinstance(vg_free, int):
3409 raise errors.OpPrereqError("Can't compute free disk space on"
3411 if req_size > info['vg_free']:
3412 raise errors.OpPrereqError("Not enough disk space on target node %s."
3413 " %d MB available, %d MB required" %
3414 (node, info['vg_free'], req_size))
3417 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3419 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3420 " primary node" % self.op.os_type)
3422 if self.op.kernel_path == constants.VALUE_NONE:
3423 raise errors.OpPrereqError("Can't set instance kernel to none")
3426 # bridge check on primary node
3427 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3428 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3429 " destination node '%s'" %
3430 (self.op.bridge, pnode.name))
3432 # memory check on primary node
3434 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3435 "creating instance %s" % self.op.instance_name,
3438 # hvm_cdrom_image_path verification
3439 if self.op.hvm_cdrom_image_path is not None:
3440 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3441 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3442 " be an absolute path or None, not %s" %
3443 self.op.hvm_cdrom_image_path)
3444 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3445 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3446 " regular file or a symlink pointing to"
3447 " an existing regular file, not %s" %
3448 self.op.hvm_cdrom_image_path)
3450 # vnc_bind_address verification
3451 if self.op.vnc_bind_address is not None:
3452 if not utils.IsValidIP(self.op.vnc_bind_address):
3453 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3454 " like a valid IP address" %
3455 self.op.vnc_bind_address)
3458 self.instance_status = 'up'
3460 self.instance_status = 'down'
3462 def Exec(self, feedback_fn):
3463 """Create and add the instance to the cluster.
3466 instance = self.op.instance_name
3467 pnode_name = self.pnode.name
3469 if self.op.mac == "auto":
3470 mac_address = self.cfg.GenerateMAC()
3472 mac_address = self.op.mac
3474 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3475 if self.inst_ip is not None:
3476 nic.ip = self.inst_ip
3478 ht_kind = self.sstore.GetHypervisorType()
3479 if ht_kind in constants.HTS_REQ_PORT:
3480 network_port = self.cfg.AllocatePort()
3484 if self.op.vnc_bind_address is None:
3485 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3487 disks = _GenerateDiskTemplate(self.cfg,
3488 self.op.disk_template,
3489 instance, pnode_name,
3490 self.secondaries, self.op.disk_size,
3493 iobj = objects.Instance(name=instance, os=self.op.os_type,
3494 primary_node=pnode_name,
3495 memory=self.op.mem_size,
3496 vcpus=self.op.vcpus,
3497 nics=[nic], disks=disks,
3498 disk_template=self.op.disk_template,
3499 status=self.instance_status,
3500 network_port=network_port,
3501 kernel_path=self.op.kernel_path,
3502 initrd_path=self.op.initrd_path,
3503 hvm_boot_order=self.op.hvm_boot_order,
3504 hvm_acpi=self.op.hvm_acpi,
3505 hvm_pae=self.op.hvm_pae,
3506 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3507 vnc_bind_address=self.op.vnc_bind_address,
3510 feedback_fn("* creating instance disks...")
3511 if not _CreateDisks(self.cfg, iobj):
3512 _RemoveDisks(iobj, self.cfg)
3513 raise errors.OpExecError("Device creation failed, reverting...")
3515 feedback_fn("adding instance %s to cluster config" % instance)
3517 self.cfg.AddInstance(iobj)
3519 if self.op.wait_for_sync:
3520 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3521 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3522 # make sure the disks are not degraded (still sync-ing is ok)
3524 feedback_fn("* checking mirrors status")
3525 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3530 _RemoveDisks(iobj, self.cfg)
3531 self.cfg.RemoveInstance(iobj.name)
3532 raise errors.OpExecError("There are some degraded disks for"
3535 feedback_fn("creating os for instance %s on node %s" %
3536 (instance, pnode_name))
3538 if iobj.disk_template != constants.DT_DISKLESS:
3539 if self.op.mode == constants.INSTANCE_CREATE:
3540 feedback_fn("* running the instance OS create scripts...")
3541 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3542 raise errors.OpExecError("could not add os for instance %s"
3544 (instance, pnode_name))
3546 elif self.op.mode == constants.INSTANCE_IMPORT:
3547 feedback_fn("* running the instance OS import scripts...")
3548 src_node = self.op.src_node
3549 src_image = self.src_image
3550 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3551 src_node, src_image):
3552 raise errors.OpExecError("Could not import os for instance"
3554 (instance, pnode_name))
3556 # also checked in the prereq part
3557 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3561 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3562 feedback_fn("* starting instance...")
3563 if not rpc.call_instance_start(pnode_name, iobj, None):
3564 raise errors.OpExecError("Could not start instance")
3567 class LUConnectConsole(NoHooksLU):
3568 """Connect to an instance's console.
3570 This is somewhat special in that it returns the command line that
3571 you need to run on the master node in order to connect to the
3575 _OP_REQP = ["instance_name"]
3577 def CheckPrereq(self):
3578 """Check prerequisites.
3580 This checks that the instance is in the cluster.
3583 instance = self.cfg.GetInstanceInfo(
3584 self.cfg.ExpandInstanceName(self.op.instance_name))
3585 if instance is None:
3586 raise errors.OpPrereqError("Instance '%s' not known" %
3587 self.op.instance_name)
3588 self.instance = instance
3590 def Exec(self, feedback_fn):
3591 """Connect to the console of an instance
3594 instance = self.instance
3595 node = instance.primary_node
3597 node_insts = rpc.call_instance_list([node])[node]
3598 if node_insts is False:
3599 raise errors.OpExecError("Can't connect to node %s." % node)
3601 if instance.name not in node_insts:
3602 raise errors.OpExecError("Instance %s is not running." % instance.name)
3604 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3606 hyper = hypervisor.GetHypervisor()
3607 console_cmd = hyper.GetShellCommandForConsole(instance)
3609 argv = ["ssh", "-q", "-t"]
3610 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3611 argv.extend(ssh.BATCH_MODE_OPTS)
3613 argv.append(console_cmd)
3617 class LUAddMDDRBDComponent(LogicalUnit):
3618 """Adda new mirror member to an instance's disk.
3621 HPATH = "mirror-add"
3622 HTYPE = constants.HTYPE_INSTANCE
3623 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3625 def BuildHooksEnv(self):
3628 This runs on the master, the primary and all the secondaries.
3632 "NEW_SECONDARY": self.op.remote_node,
3633 "DISK_NAME": self.op.disk_name,
3635 env.update(_BuildInstanceHookEnvByObject(self.instance))
3636 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3637 self.op.remote_node,] + list(self.instance.secondary_nodes)
3640 def CheckPrereq(self):
3641 """Check prerequisites.
3643 This checks that the instance is in the cluster.
3646 instance = self.cfg.GetInstanceInfo(
3647 self.cfg.ExpandInstanceName(self.op.instance_name))
3648 if instance is None:
3649 raise errors.OpPrereqError("Instance '%s' not known" %
3650 self.op.instance_name)
3651 self.instance = instance
3653 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3654 if remote_node is None:
3655 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3656 self.remote_node = remote_node
3658 if remote_node == instance.primary_node:
3659 raise errors.OpPrereqError("The specified node is the primary node of"
3662 if instance.disk_template != constants.DT_REMOTE_RAID1:
3663 raise errors.OpPrereqError("Instance's disk layout is not"
3665 for disk in instance.disks:
3666 if disk.iv_name == self.op.disk_name:
3669 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3670 " instance." % self.op.disk_name)
3671 if len(disk.children) > 1:
3672 raise errors.OpPrereqError("The device already has two slave devices."
3673 " This would create a 3-disk raid1 which we"
3677 def Exec(self, feedback_fn):
3678 """Add the mirror component
3682 instance = self.instance
3684 remote_node = self.remote_node
3685 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3686 names = _GenerateUniqueNames(self.cfg, lv_names)
3687 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3688 remote_node, disk.size, names)
3690 logger.Info("adding new mirror component on secondary")
3692 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3694 _GetInstanceInfoText(instance)):
3695 raise errors.OpExecError("Failed to create new component on secondary"
3696 " node %s" % remote_node)
3698 logger.Info("adding new mirror component on primary")
3700 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3702 _GetInstanceInfoText(instance)):
3703 # remove secondary dev
3704 self.cfg.SetDiskID(new_drbd, remote_node)
3705 rpc.call_blockdev_remove(remote_node, new_drbd)
3706 raise errors.OpExecError("Failed to create volume on primary")
3708 # the device exists now
3709 # call the primary node to add the mirror to md
3710 logger.Info("adding new mirror component to md")
3711 if not rpc.call_blockdev_addchildren(instance.primary_node,
3713 logger.Error("Can't add mirror compoment to md!")
3714 self.cfg.SetDiskID(new_drbd, remote_node)
3715 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3716 logger.Error("Can't rollback on secondary")
3717 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3718 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3719 logger.Error("Can't rollback on primary")
3720 raise errors.OpExecError("Can't add mirror component to md array")
3722 disk.children.append(new_drbd)
3724 self.cfg.AddInstance(instance)
3726 _WaitForSync(self.cfg, instance, self.proc)
3731 class LURemoveMDDRBDComponent(LogicalUnit):
3732 """Remove a component from a remote_raid1 disk.
3735 HPATH = "mirror-remove"
3736 HTYPE = constants.HTYPE_INSTANCE
3737 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3739 def BuildHooksEnv(self):
3742 This runs on the master, the primary and all the secondaries.
3746 "DISK_NAME": self.op.disk_name,
3747 "DISK_ID": self.op.disk_id,
3748 "OLD_SECONDARY": self.old_secondary,
3750 env.update(_BuildInstanceHookEnvByObject(self.instance))
3751 nl = [self.sstore.GetMasterNode(),
3752 self.instance.primary_node] + list(self.instance.secondary_nodes)
3755 def CheckPrereq(self):
3756 """Check prerequisites.
3758 This checks that the instance is in the cluster.
3761 instance = self.cfg.GetInstanceInfo(
3762 self.cfg.ExpandInstanceName(self.op.instance_name))
3763 if instance is None:
3764 raise errors.OpPrereqError("Instance '%s' not known" %
3765 self.op.instance_name)
3766 self.instance = instance
3768 if instance.disk_template != constants.DT_REMOTE_RAID1:
3769 raise errors.OpPrereqError("Instance's disk layout is not"
3771 for disk in instance.disks:
3772 if disk.iv_name == self.op.disk_name:
3775 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3776 " instance." % self.op.disk_name)
3777 for child in disk.children:
3778 if (child.dev_type == constants.LD_DRBD7 and
3779 child.logical_id[2] == self.op.disk_id):
3782 raise errors.OpPrereqError("Can't find the device with this port.")
3784 if len(disk.children) < 2:
3785 raise errors.OpPrereqError("Cannot remove the last component from"
3789 if self.child.logical_id[0] == instance.primary_node:
3793 self.old_secondary = self.child.logical_id[oid]
3795 def Exec(self, feedback_fn):
3796 """Remove the mirror component
3799 instance = self.instance
3802 logger.Info("remove mirror component")
3803 self.cfg.SetDiskID(disk, instance.primary_node)
3804 if not rpc.call_blockdev_removechildren(instance.primary_node,
3806 raise errors.OpExecError("Can't remove child from mirror.")
3808 for node in child.logical_id[:2]:
3809 self.cfg.SetDiskID(child, node)
3810 if not rpc.call_blockdev_remove(node, child):
3811 logger.Error("Warning: failed to remove device from node %s,"
3812 " continuing operation." % node)
3814 disk.children.remove(child)
3815 self.cfg.AddInstance(instance)
3818 class LUReplaceDisks(LogicalUnit):
3819 """Replace the disks of an instance.
3822 HPATH = "mirrors-replace"
3823 HTYPE = constants.HTYPE_INSTANCE
3824 _OP_REQP = ["instance_name", "mode", "disks"]
3826 def _RunAllocator(self):
3827 """Compute a new secondary node using an IAllocator.
3830 ial = IAllocator(self.cfg, self.sstore,
3831 mode=constants.IALLOCATOR_MODE_RELOC,
3832 name=self.op.instance_name,
3833 relocate_from=[self.sec_node])
3835 ial.Run(self.op.iallocator)
3838 raise errors.OpPrereqError("Can't compute nodes using"
3839 " iallocator '%s': %s" % (self.op.iallocator,
3841 if len(ial.nodes) != ial.required_nodes:
3842 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3843 " of nodes (%s), required %s" %
3844 (len(ial.nodes), ial.required_nodes))
3845 self.op.remote_node = ial.nodes[0]
3846 logger.ToStdout("Selected new secondary for the instance: %s" %
3847 self.op.remote_node)
3849 def BuildHooksEnv(self):
3852 This runs on the master, the primary and all the secondaries.
3856 "MODE": self.op.mode,
3857 "NEW_SECONDARY": self.op.remote_node,
3858 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3860 env.update(_BuildInstanceHookEnvByObject(self.instance))
3862 self.sstore.GetMasterNode(),
3863 self.instance.primary_node,
3865 if self.op.remote_node is not None:
3866 nl.append(self.op.remote_node)
3869 def CheckPrereq(self):
3870 """Check prerequisites.
3872 This checks that the instance is in the cluster.
3875 if not hasattr(self.op, "remote_node"):
3876 self.op.remote_node = None
3878 instance = self.cfg.GetInstanceInfo(
3879 self.cfg.ExpandInstanceName(self.op.instance_name))
3880 if instance is None:
3881 raise errors.OpPrereqError("Instance '%s' not known" %
3882 self.op.instance_name)
3883 self.instance = instance
3884 self.op.instance_name = instance.name
3886 if instance.disk_template not in constants.DTS_NET_MIRROR:
3887 raise errors.OpPrereqError("Instance's disk layout is not"
3888 " network mirrored.")
3890 if len(instance.secondary_nodes) != 1:
3891 raise errors.OpPrereqError("The instance has a strange layout,"
3892 " expected one secondary but found %d" %
3893 len(instance.secondary_nodes))
3895 self.sec_node = instance.secondary_nodes[0]
3897 ia_name = getattr(self.op, "iallocator", None)
3898 if ia_name is not None:
3899 if self.op.remote_node is not None:
3900 raise errors.OpPrereqError("Give either the iallocator or the new"
3901 " secondary, not both")
3902 self.op.remote_node = self._RunAllocator()
3904 remote_node = self.op.remote_node
3905 if remote_node is not None:
3906 remote_node = self.cfg.ExpandNodeName(remote_node)
3907 if remote_node is None:
3908 raise errors.OpPrereqError("Node '%s' not known" %
3909 self.op.remote_node)
3910 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3912 self.remote_node_info = None
3913 if remote_node == instance.primary_node:
3914 raise errors.OpPrereqError("The specified node is the primary node of"
3916 elif remote_node == self.sec_node:
3917 if self.op.mode == constants.REPLACE_DISK_SEC:
3918 # this is for DRBD8, where we can't execute the same mode of
3919 # replacement as for drbd7 (no different port allocated)
3920 raise errors.OpPrereqError("Same secondary given, cannot execute"
3922 # the user gave the current secondary, switch to
3923 # 'no-replace-secondary' mode for drbd7
3925 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3926 self.op.mode != constants.REPLACE_DISK_ALL):
3927 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3928 " disks replacement, not individual ones")
3929 if instance.disk_template == constants.DT_DRBD8:
3930 if (self.op.mode == constants.REPLACE_DISK_ALL and
3931 remote_node is not None):
3932 # switch to replace secondary mode
3933 self.op.mode = constants.REPLACE_DISK_SEC
3935 if self.op.mode == constants.REPLACE_DISK_ALL:
3936 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3937 " secondary disk replacement, not"
3939 elif self.op.mode == constants.REPLACE_DISK_PRI:
3940 if remote_node is not None:
3941 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3942 " the secondary while doing a primary"
3943 " node disk replacement")
3944 self.tgt_node = instance.primary_node
3945 self.oth_node = instance.secondary_nodes[0]
3946 elif self.op.mode == constants.REPLACE_DISK_SEC:
3947 self.new_node = remote_node # this can be None, in which case
3948 # we don't change the secondary
3949 self.tgt_node = instance.secondary_nodes[0]
3950 self.oth_node = instance.primary_node
3952 raise errors.ProgrammerError("Unhandled disk replace mode")
3954 for name in self.op.disks:
3955 if instance.FindDisk(name) is None:
3956 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3957 (name, instance.name))
3958 self.op.remote_node = remote_node
3960 def _ExecRR1(self, feedback_fn):
3961 """Replace the disks of an instance.
3964 instance = self.instance
3967 if self.op.remote_node is None:
3968 remote_node = self.sec_node
3970 remote_node = self.op.remote_node
3972 for dev in instance.disks:
3974 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3975 names = _GenerateUniqueNames(cfg, lv_names)
3976 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3977 remote_node, size, names)
3978 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3979 logger.Info("adding new mirror component on secondary for %s" %
3982 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3984 _GetInstanceInfoText(instance)):
3985 raise errors.OpExecError("Failed to create new component on secondary"
3986 " node %s. Full abort, cleanup manually!" %
3989 logger.Info("adding new mirror component on primary")
3991 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3993 _GetInstanceInfoText(instance)):
3994 # remove secondary dev
3995 cfg.SetDiskID(new_drbd, remote_node)
3996 rpc.call_blockdev_remove(remote_node, new_drbd)
3997 raise errors.OpExecError("Failed to create volume on primary!"
3998 " Full abort, cleanup manually!!")
4000 # the device exists now
4001 # call the primary node to add the mirror to md
4002 logger.Info("adding new mirror component to md")
4003 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4005 logger.Error("Can't add mirror compoment to md!")
4006 cfg.SetDiskID(new_drbd, remote_node)
4007 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4008 logger.Error("Can't rollback on secondary")
4009 cfg.SetDiskID(new_drbd, instance.primary_node)
4010 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4011 logger.Error("Can't rollback on primary")
4012 raise errors.OpExecError("Full abort, cleanup manually!!")
4014 dev.children.append(new_drbd)
4015 cfg.AddInstance(instance)
4017 # this can fail as the old devices are degraded and _WaitForSync
4018 # does a combined result over all disks, so we don't check its
4020 _WaitForSync(cfg, instance, self.proc, unlock=True)
4022 # so check manually all the devices
4023 for name in iv_names:
4024 dev, child, new_drbd = iv_names[name]
4025 cfg.SetDiskID(dev, instance.primary_node)
4026 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4028 raise errors.OpExecError("MD device %s is degraded!" % name)
4029 cfg.SetDiskID(new_drbd, instance.primary_node)
4030 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4032 raise errors.OpExecError("New drbd device %s is degraded!" % name)
4034 for name in iv_names:
4035 dev, child, new_drbd = iv_names[name]
4036 logger.Info("remove mirror %s component" % name)
4037 cfg.SetDiskID(dev, instance.primary_node)
4038 if not rpc.call_blockdev_removechildren(instance.primary_node,
4040 logger.Error("Can't remove child from mirror, aborting"
4041 " *this device cleanup*.\nYou need to cleanup manually!!")
4044 for node in child.logical_id[:2]:
4045 logger.Info("remove child device on %s" % node)
4046 cfg.SetDiskID(child, node)
4047 if not rpc.call_blockdev_remove(node, child):
4048 logger.Error("Warning: failed to remove device from node %s,"
4049 " continuing operation." % node)
4051 dev.children.remove(child)
4053 cfg.AddInstance(instance)
4055 def _ExecD8DiskOnly(self, feedback_fn):
4056 """Replace a disk on the primary or secondary for dbrd8.
4058 The algorithm for replace is quite complicated:
4059 - for each disk to be replaced:
4060 - create new LVs on the target node with unique names
4061 - detach old LVs from the drbd device
4062 - rename old LVs to name_replaced.<time_t>
4063 - rename new LVs to old LVs
4064 - attach the new LVs (with the old names now) to the drbd device
4065 - wait for sync across all devices
4066 - for each modified disk:
4067 - remove old LVs (which have the name name_replaces.<time_t>)
4069 Failures are not very well handled.
4073 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4074 instance = self.instance
4076 vgname = self.cfg.GetVGName()
4079 tgt_node = self.tgt_node
4080 oth_node = self.oth_node
4082 # Step: check device activation
4083 self.proc.LogStep(1, steps_total, "check device existence")
4084 info("checking volume groups")
4085 my_vg = cfg.GetVGName()
4086 results = rpc.call_vg_list([oth_node, tgt_node])
4088 raise errors.OpExecError("Can't list volume groups on the nodes")
4089 for node in oth_node, tgt_node:
4090 res = results.get(node, False)
4091 if not res or my_vg not in res:
4092 raise errors.OpExecError("Volume group '%s' not found on %s" %
4094 for dev in instance.disks:
4095 if not dev.iv_name in self.op.disks:
4097 for node in tgt_node, oth_node:
4098 info("checking %s on %s" % (dev.iv_name, node))
4099 cfg.SetDiskID(dev, node)
4100 if not rpc.call_blockdev_find(node, dev):
4101 raise errors.OpExecError("Can't find device %s on node %s" %
4102 (dev.iv_name, node))
4104 # Step: check other node consistency
4105 self.proc.LogStep(2, steps_total, "check peer consistency")
4106 for dev in instance.disks:
4107 if not dev.iv_name in self.op.disks:
4109 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4110 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4111 oth_node==instance.primary_node):
4112 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4113 " to replace disks on this node (%s)" %
4114 (oth_node, tgt_node))
4116 # Step: create new storage
4117 self.proc.LogStep(3, steps_total, "allocate new storage")
4118 for dev in instance.disks:
4119 if not dev.iv_name in self.op.disks:
4122 cfg.SetDiskID(dev, tgt_node)
4123 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4124 names = _GenerateUniqueNames(cfg, lv_names)
4125 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4126 logical_id=(vgname, names[0]))
4127 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4128 logical_id=(vgname, names[1]))
4129 new_lvs = [lv_data, lv_meta]
4130 old_lvs = dev.children
4131 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4132 info("creating new local storage on %s for %s" %
4133 (tgt_node, dev.iv_name))
4134 # since we *always* want to create this LV, we use the
4135 # _Create...OnPrimary (which forces the creation), even if we
4136 # are talking about the secondary node
4137 for new_lv in new_lvs:
4138 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4139 _GetInstanceInfoText(instance)):
4140 raise errors.OpExecError("Failed to create new LV named '%s' on"
4142 (new_lv.logical_id[1], tgt_node))
4144 # Step: for each lv, detach+rename*2+attach
4145 self.proc.LogStep(4, steps_total, "change drbd configuration")
4146 for dev, old_lvs, new_lvs in iv_names.itervalues():
4147 info("detaching %s drbd from local storage" % dev.iv_name)
4148 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4149 raise errors.OpExecError("Can't detach drbd from local storage on node"
4150 " %s for device %s" % (tgt_node, dev.iv_name))
4152 #cfg.Update(instance)
4154 # ok, we created the new LVs, so now we know we have the needed
4155 # storage; as such, we proceed on the target node to rename
4156 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4157 # using the assumption that logical_id == physical_id (which in
4158 # turn is the unique_id on that node)
4160 # FIXME(iustin): use a better name for the replaced LVs
4161 temp_suffix = int(time.time())
4162 ren_fn = lambda d, suff: (d.physical_id[0],
4163 d.physical_id[1] + "_replaced-%s" % suff)
4164 # build the rename list based on what LVs exist on the node
4166 for to_ren in old_lvs:
4167 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4168 if find_res is not None: # device exists
4169 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4171 info("renaming the old LVs on the target node")
4172 if not rpc.call_blockdev_rename(tgt_node, rlist):
4173 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4174 # now we rename the new LVs to the old LVs
4175 info("renaming the new LVs on the target node")
4176 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4177 if not rpc.call_blockdev_rename(tgt_node, rlist):
4178 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4180 for old, new in zip(old_lvs, new_lvs):
4181 new.logical_id = old.logical_id
4182 cfg.SetDiskID(new, tgt_node)
4184 for disk in old_lvs:
4185 disk.logical_id = ren_fn(disk, temp_suffix)
4186 cfg.SetDiskID(disk, tgt_node)
4188 # now that the new lvs have the old name, we can add them to the device
4189 info("adding new mirror component on %s" % tgt_node)
4190 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4191 for new_lv in new_lvs:
4192 if not rpc.call_blockdev_remove(tgt_node, new_lv):
4193 warning("Can't rollback device %s", hint="manually cleanup unused"
4195 raise errors.OpExecError("Can't add local storage to drbd")
4197 dev.children = new_lvs
4198 cfg.Update(instance)
4200 # Step: wait for sync
4202 # this can fail as the old devices are degraded and _WaitForSync
4203 # does a combined result over all disks, so we don't check its
4205 self.proc.LogStep(5, steps_total, "sync devices")
4206 _WaitForSync(cfg, instance, self.proc, unlock=True)
4208 # so check manually all the devices
4209 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4210 cfg.SetDiskID(dev, instance.primary_node)
4211 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4213 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4215 # Step: remove old storage
4216 self.proc.LogStep(6, steps_total, "removing old storage")
4217 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4218 info("remove logical volumes for %s" % name)
4220 cfg.SetDiskID(lv, tgt_node)
4221 if not rpc.call_blockdev_remove(tgt_node, lv):
4222 warning("Can't remove old LV", hint="manually remove unused LVs")
4225 def _ExecD8Secondary(self, feedback_fn):
4226 """Replace the secondary node for drbd8.
4228 The algorithm for replace is quite complicated:
4229 - for all disks of the instance:
4230 - create new LVs on the new node with same names
4231 - shutdown the drbd device on the old secondary
4232 - disconnect the drbd network on the primary
4233 - create the drbd device on the new secondary
4234 - network attach the drbd on the primary, using an artifice:
4235 the drbd code for Attach() will connect to the network if it
4236 finds a device which is connected to the good local disks but
4238 - wait for sync across all devices
4239 - remove all disks from the old secondary
4241 Failures are not very well handled.
4245 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4246 instance = self.instance
4248 vgname = self.cfg.GetVGName()
4251 old_node = self.tgt_node
4252 new_node = self.new_node
4253 pri_node = instance.primary_node
4255 # Step: check device activation
4256 self.proc.LogStep(1, steps_total, "check device existence")
4257 info("checking volume groups")
4258 my_vg = cfg.GetVGName()
4259 results = rpc.call_vg_list([pri_node, new_node])
4261 raise errors.OpExecError("Can't list volume groups on the nodes")
4262 for node in pri_node, new_node:
4263 res = results.get(node, False)
4264 if not res or my_vg not in res:
4265 raise errors.OpExecError("Volume group '%s' not found on %s" %
4267 for dev in instance.disks:
4268 if not dev.iv_name in self.op.disks:
4270 info("checking %s on %s" % (dev.iv_name, pri_node))
4271 cfg.SetDiskID(dev, pri_node)
4272 if not rpc.call_blockdev_find(pri_node, dev):
4273 raise errors.OpExecError("Can't find device %s on node %s" %
4274 (dev.iv_name, pri_node))
4276 # Step: check other node consistency
4277 self.proc.LogStep(2, steps_total, "check peer consistency")
4278 for dev in instance.disks:
4279 if not dev.iv_name in self.op.disks:
4281 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4282 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4283 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4284 " unsafe to replace the secondary" %
4287 # Step: create new storage
4288 self.proc.LogStep(3, steps_total, "allocate new storage")
4289 for dev in instance.disks:
4291 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4292 # since we *always* want to create this LV, we use the
4293 # _Create...OnPrimary (which forces the creation), even if we
4294 # are talking about the secondary node
4295 for new_lv in dev.children:
4296 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4297 _GetInstanceInfoText(instance)):
4298 raise errors.OpExecError("Failed to create new LV named '%s' on"
4300 (new_lv.logical_id[1], new_node))
4302 iv_names[dev.iv_name] = (dev, dev.children)
4304 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4305 for dev in instance.disks:
4307 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4308 # create new devices on new_node
4309 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4310 logical_id=(pri_node, new_node,
4312 children=dev.children)
4313 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4315 _GetInstanceInfoText(instance)):
4316 raise errors.OpExecError("Failed to create new DRBD on"
4317 " node '%s'" % new_node)
4319 for dev in instance.disks:
4320 # we have new devices, shutdown the drbd on the old secondary
4321 info("shutting down drbd for %s on old node" % dev.iv_name)
4322 cfg.SetDiskID(dev, old_node)
4323 if not rpc.call_blockdev_shutdown(old_node, dev):
4324 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4325 hint="Please cleanup this device manually as soon as possible")
4327 info("detaching primary drbds from the network (=> standalone)")
4329 for dev in instance.disks:
4330 cfg.SetDiskID(dev, pri_node)
4331 # set the physical (unique in bdev terms) id to None, meaning
4332 # detach from network
4333 dev.physical_id = (None,) * len(dev.physical_id)
4334 # and 'find' the device, which will 'fix' it to match the
4336 if rpc.call_blockdev_find(pri_node, dev):
4339 warning("Failed to detach drbd %s from network, unusual case" %
4343 # no detaches succeeded (very unlikely)
4344 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4346 # if we managed to detach at least one, we update all the disks of
4347 # the instance to point to the new secondary
4348 info("updating instance configuration")
4349 for dev in instance.disks:
4350 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4351 cfg.SetDiskID(dev, pri_node)
4352 cfg.Update(instance)
4354 # and now perform the drbd attach
4355 info("attaching primary drbds to new secondary (standalone => connected)")
4357 for dev in instance.disks:
4358 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4359 # since the attach is smart, it's enough to 'find' the device,
4360 # it will automatically activate the network, if the physical_id
4362 cfg.SetDiskID(dev, pri_node)
4363 if not rpc.call_blockdev_find(pri_node, dev):
4364 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4365 "please do a gnt-instance info to see the status of disks")
4367 # this can fail as the old devices are degraded and _WaitForSync
4368 # does a combined result over all disks, so we don't check its
4370 self.proc.LogStep(5, steps_total, "sync devices")
4371 _WaitForSync(cfg, instance, self.proc, unlock=True)
4373 # so check manually all the devices
4374 for name, (dev, old_lvs) in iv_names.iteritems():
4375 cfg.SetDiskID(dev, pri_node)
4376 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4378 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4380 self.proc.LogStep(6, steps_total, "removing old storage")
4381 for name, (dev, old_lvs) in iv_names.iteritems():
4382 info("remove logical volumes for %s" % name)
4384 cfg.SetDiskID(lv, old_node)
4385 if not rpc.call_blockdev_remove(old_node, lv):
4386 warning("Can't remove LV on old secondary",
4387 hint="Cleanup stale volumes by hand")
4389 def Exec(self, feedback_fn):
4390 """Execute disk replacement.
4392 This dispatches the disk replacement to the appropriate handler.
4395 instance = self.instance
4397 # Activate the instance disks if we're replacing them on a down instance
4398 if instance.status == "down":
4399 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4400 self.proc.ChainOpCode(op)
4402 if instance.disk_template == constants.DT_REMOTE_RAID1:
4404 elif instance.disk_template == constants.DT_DRBD8:
4405 if self.op.remote_node is None:
4406 fn = self._ExecD8DiskOnly
4408 fn = self._ExecD8Secondary
4410 raise errors.ProgrammerError("Unhandled disk replacement case")
4412 ret = fn(feedback_fn)
4414 # Deactivate the instance disks if we're replacing them on a down instance
4415 if instance.status == "down":
4416 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4417 self.proc.ChainOpCode(op)
4422 class LUGrowDisk(LogicalUnit):
4423 """Grow a disk of an instance.
4427 HTYPE = constants.HTYPE_INSTANCE
4428 _OP_REQP = ["instance_name", "disk", "amount"]
4430 def BuildHooksEnv(self):
4433 This runs on the master, the primary and all the secondaries.
4437 "DISK": self.op.disk,
4438 "AMOUNT": self.op.amount,
4440 env.update(_BuildInstanceHookEnvByObject(self.instance))
4442 self.sstore.GetMasterNode(),
4443 self.instance.primary_node,
4447 def CheckPrereq(self):
4448 """Check prerequisites.
4450 This checks that the instance is in the cluster.
4453 instance = self.cfg.GetInstanceInfo(
4454 self.cfg.ExpandInstanceName(self.op.instance_name))
4455 if instance is None:
4456 raise errors.OpPrereqError("Instance '%s' not known" %
4457 self.op.instance_name)
4458 self.instance = instance
4459 self.op.instance_name = instance.name
4461 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4462 raise errors.OpPrereqError("Instance's disk layout does not support"
4465 if instance.FindDisk(self.op.disk) is None:
4466 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4467 (self.op.disk, instance.name))
4469 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4470 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4471 for node in nodenames:
4472 info = nodeinfo.get(node, None)
4474 raise errors.OpPrereqError("Cannot get current information"
4475 " from node '%s'" % node)
4476 vg_free = info.get('vg_free', None)
4477 if not isinstance(vg_free, int):
4478 raise errors.OpPrereqError("Can't compute free disk space on"
4480 if self.op.amount > info['vg_free']:
4481 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4482 " %d MiB available, %d MiB required" %
4483 (node, info['vg_free'], self.op.amount))
4485 def Exec(self, feedback_fn):
4486 """Execute disk grow.
4489 instance = self.instance
4490 disk = instance.FindDisk(self.op.disk)
4491 for node in (instance.secondary_nodes + (instance.primary_node,)):
4492 self.cfg.SetDiskID(disk, node)
4493 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4494 if not result or not isinstance(result, tuple) or len(result) != 2:
4495 raise errors.OpExecError("grow request failed to node %s" % node)
4497 raise errors.OpExecError("grow request failed to node %s: %s" %
4499 disk.RecordGrow(self.op.amount)
4500 self.cfg.Update(instance)
4504 class LUQueryInstanceData(NoHooksLU):
4505 """Query runtime instance data.
4508 _OP_REQP = ["instances"]
4510 def CheckPrereq(self):
4511 """Check prerequisites.
4513 This only checks the optional instance list against the existing names.
4516 if not isinstance(self.op.instances, list):
4517 raise errors.OpPrereqError("Invalid argument type 'instances'")
4518 if self.op.instances:
4519 self.wanted_instances = []
4520 names = self.op.instances
4522 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4523 if instance is None:
4524 raise errors.OpPrereqError("No such instance name '%s'" % name)
4525 self.wanted_instances.append(instance)
4527 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4528 in self.cfg.GetInstanceList()]
4532 def _ComputeDiskStatus(self, instance, snode, dev):
4533 """Compute block device status.
4536 self.cfg.SetDiskID(dev, instance.primary_node)
4537 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4538 if dev.dev_type in constants.LDS_DRBD:
4539 # we change the snode then (otherwise we use the one passed in)
4540 if dev.logical_id[0] == instance.primary_node:
4541 snode = dev.logical_id[1]
4543 snode = dev.logical_id[0]
4546 self.cfg.SetDiskID(dev, snode)
4547 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4552 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4553 for child in dev.children]
4558 "iv_name": dev.iv_name,
4559 "dev_type": dev.dev_type,
4560 "logical_id": dev.logical_id,
4561 "physical_id": dev.physical_id,
4562 "pstatus": dev_pstatus,
4563 "sstatus": dev_sstatus,
4564 "children": dev_children,
4569 def Exec(self, feedback_fn):
4570 """Gather and return data"""
4572 for instance in self.wanted_instances:
4573 remote_info = rpc.call_instance_info(instance.primary_node,
4575 if remote_info and "state" in remote_info:
4578 remote_state = "down"
4579 if instance.status == "down":
4580 config_state = "down"
4584 disks = [self._ComputeDiskStatus(instance, None, device)
4585 for device in instance.disks]
4588 "name": instance.name,
4589 "config_state": config_state,
4590 "run_state": remote_state,
4591 "pnode": instance.primary_node,
4592 "snodes": instance.secondary_nodes,
4594 "memory": instance.memory,
4595 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4597 "vcpus": instance.vcpus,
4600 htkind = self.sstore.GetHypervisorType()
4601 if htkind == constants.HT_XEN_PVM30:
4602 idict["kernel_path"] = instance.kernel_path
4603 idict["initrd_path"] = instance.initrd_path
4605 if htkind == constants.HT_XEN_HVM31:
4606 idict["hvm_boot_order"] = instance.hvm_boot_order
4607 idict["hvm_acpi"] = instance.hvm_acpi
4608 idict["hvm_pae"] = instance.hvm_pae
4609 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4611 if htkind in constants.HTS_REQ_PORT:
4612 idict["vnc_bind_address"] = instance.vnc_bind_address
4613 idict["network_port"] = instance.network_port
4615 result[instance.name] = idict
4620 class LUSetInstanceParms(LogicalUnit):
4621 """Modifies an instances's parameters.
4624 HPATH = "instance-modify"
4625 HTYPE = constants.HTYPE_INSTANCE
4626 _OP_REQP = ["instance_name"]
4628 def BuildHooksEnv(self):
4631 This runs on the master, primary and secondaries.
4636 args['memory'] = self.mem
4638 args['vcpus'] = self.vcpus
4639 if self.do_ip or self.do_bridge or self.mac:
4643 ip = self.instance.nics[0].ip
4645 bridge = self.bridge
4647 bridge = self.instance.nics[0].bridge
4651 mac = self.instance.nics[0].mac
4652 args['nics'] = [(ip, bridge, mac)]
4653 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4654 nl = [self.sstore.GetMasterNode(),
4655 self.instance.primary_node] + list(self.instance.secondary_nodes)
4658 def CheckPrereq(self):
4659 """Check prerequisites.
4661 This only checks the instance list against the existing names.
4664 self.mem = getattr(self.op, "mem", None)
4665 self.vcpus = getattr(self.op, "vcpus", None)
4666 self.ip = getattr(self.op, "ip", None)
4667 self.mac = getattr(self.op, "mac", None)
4668 self.bridge = getattr(self.op, "bridge", None)
4669 self.kernel_path = getattr(self.op, "kernel_path", None)
4670 self.initrd_path = getattr(self.op, "initrd_path", None)
4671 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4672 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4673 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4674 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4675 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4676 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4677 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4678 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4679 self.vnc_bind_address]
4680 if all_parms.count(None) == len(all_parms):
4681 raise errors.OpPrereqError("No changes submitted")
4682 if self.mem is not None:
4684 self.mem = int(self.mem)
4685 except ValueError, err:
4686 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4687 if self.vcpus is not None:
4689 self.vcpus = int(self.vcpus)
4690 except ValueError, err:
4691 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4692 if self.ip is not None:
4694 if self.ip.lower() == "none":
4697 if not utils.IsValidIP(self.ip):
4698 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4701 self.do_bridge = (self.bridge is not None)
4702 if self.mac is not None:
4703 if self.cfg.IsMacInUse(self.mac):
4704 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4706 if not utils.IsValidMac(self.mac):
4707 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4709 if self.kernel_path is not None:
4710 self.do_kernel_path = True
4711 if self.kernel_path == constants.VALUE_NONE:
4712 raise errors.OpPrereqError("Can't set instance to no kernel")
4714 if self.kernel_path != constants.VALUE_DEFAULT:
4715 if not os.path.isabs(self.kernel_path):
4716 raise errors.OpPrereqError("The kernel path must be an absolute"
4719 self.do_kernel_path = False
4721 if self.initrd_path is not None:
4722 self.do_initrd_path = True
4723 if self.initrd_path not in (constants.VALUE_NONE,
4724 constants.VALUE_DEFAULT):
4725 if not os.path.isabs(self.initrd_path):
4726 raise errors.OpPrereqError("The initrd path must be an absolute"
4729 self.do_initrd_path = False
4731 # boot order verification
4732 if self.hvm_boot_order is not None:
4733 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4734 if len(self.hvm_boot_order.strip("acdn")) != 0:
4735 raise errors.OpPrereqError("invalid boot order specified,"
4736 " must be one or more of [acdn]"
4739 # hvm_cdrom_image_path verification
4740 if self.op.hvm_cdrom_image_path is not None:
4741 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4742 self.op.hvm_cdrom_image_path.lower() == "none"):
4743 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4744 " be an absolute path or None, not %s" %
4745 self.op.hvm_cdrom_image_path)
4746 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4747 self.op.hvm_cdrom_image_path.lower() == "none"):
4748 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4749 " regular file or a symlink pointing to"
4750 " an existing regular file, not %s" %
4751 self.op.hvm_cdrom_image_path)
4753 # vnc_bind_address verification
4754 if self.op.vnc_bind_address is not None:
4755 if not utils.IsValidIP(self.op.vnc_bind_address):
4756 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4757 " like a valid IP address" %
4758 self.op.vnc_bind_address)
4760 instance = self.cfg.GetInstanceInfo(
4761 self.cfg.ExpandInstanceName(self.op.instance_name))
4762 if instance is None:
4763 raise errors.OpPrereqError("No such instance name '%s'" %
4764 self.op.instance_name)
4765 self.op.instance_name = instance.name
4766 self.instance = instance
4769 def Exec(self, feedback_fn):
4770 """Modifies an instance.
4772 All parameters take effect only at the next restart of the instance.
4775 instance = self.instance
4777 instance.memory = self.mem
4778 result.append(("mem", self.mem))
4780 instance.vcpus = self.vcpus
4781 result.append(("vcpus", self.vcpus))
4783 instance.nics[0].ip = self.ip
4784 result.append(("ip", self.ip))
4786 instance.nics[0].bridge = self.bridge
4787 result.append(("bridge", self.bridge))
4789 instance.nics[0].mac = self.mac
4790 result.append(("mac", self.mac))
4791 if self.do_kernel_path:
4792 instance.kernel_path = self.kernel_path
4793 result.append(("kernel_path", self.kernel_path))
4794 if self.do_initrd_path:
4795 instance.initrd_path = self.initrd_path
4796 result.append(("initrd_path", self.initrd_path))
4797 if self.hvm_boot_order:
4798 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4799 instance.hvm_boot_order = None
4801 instance.hvm_boot_order = self.hvm_boot_order
4802 result.append(("hvm_boot_order", self.hvm_boot_order))
4803 if self.hvm_acpi is not None:
4804 instance.hvm_acpi = self.hvm_acpi
4805 result.append(("hvm_acpi", self.hvm_acpi))
4806 if self.hvm_pae is not None:
4807 instance.hvm_pae = self.hvm_pae
4808 result.append(("hvm_pae", self.hvm_pae))
4809 if self.hvm_cdrom_image_path:
4810 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4811 instance.hvm_cdrom_image_path = None
4813 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4814 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4815 if self.vnc_bind_address:
4816 instance.vnc_bind_address = self.vnc_bind_address
4817 result.append(("vnc_bind_address", self.vnc_bind_address))
4819 self.cfg.AddInstance(instance)
4824 class LUQueryExports(NoHooksLU):
4825 """Query the exports list
4830 def CheckPrereq(self):
4831 """Check that the nodelist contains only existing nodes.
4834 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4836 def Exec(self, feedback_fn):
4837 """Compute the list of all the exported system images.
4840 a dictionary with the structure node->(export-list)
4841 where export-list is a list of the instances exported on
4845 return rpc.call_export_list(self.nodes)
4848 class LUExportInstance(LogicalUnit):
4849 """Export an instance to an image in the cluster.
4852 HPATH = "instance-export"
4853 HTYPE = constants.HTYPE_INSTANCE
4854 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4856 def BuildHooksEnv(self):
4859 This will run on the master, primary node and target node.
4863 "EXPORT_NODE": self.op.target_node,
4864 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4866 env.update(_BuildInstanceHookEnvByObject(self.instance))
4867 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4868 self.op.target_node]
4871 def CheckPrereq(self):
4872 """Check prerequisites.
4874 This checks that the instance and node names are valid.
4877 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4878 self.instance = self.cfg.GetInstanceInfo(instance_name)
4879 if self.instance is None:
4880 raise errors.OpPrereqError("Instance '%s' not found" %
4881 self.op.instance_name)
4884 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4885 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4887 if self.dst_node is None:
4888 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4889 self.op.target_node)
4890 self.op.target_node = self.dst_node.name
4892 def Exec(self, feedback_fn):
4893 """Export an instance to an image in the cluster.
4896 instance = self.instance
4897 dst_node = self.dst_node
4898 src_node = instance.primary_node
4899 if self.op.shutdown:
4900 # shutdown the instance, but not the disks
4901 if not rpc.call_instance_shutdown(src_node, instance):
4902 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4903 (instance.name, src_node))
4905 vgname = self.cfg.GetVGName()
4910 for disk in instance.disks:
4911 if disk.iv_name == "sda":
4912 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4913 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4915 if not new_dev_name:
4916 logger.Error("could not snapshot block device %s on node %s" %
4917 (disk.logical_id[1], src_node))
4919 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4920 logical_id=(vgname, new_dev_name),
4921 physical_id=(vgname, new_dev_name),
4922 iv_name=disk.iv_name)
4923 snap_disks.append(new_dev)
4926 if self.op.shutdown and instance.status == "up":
4927 if not rpc.call_instance_start(src_node, instance, None):
4928 _ShutdownInstanceDisks(instance, self.cfg)
4929 raise errors.OpExecError("Could not start instance")
4931 # TODO: check for size
4933 for dev in snap_disks:
4934 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4936 logger.Error("could not export block device %s from node"
4938 (dev.logical_id[1], src_node, dst_node.name))
4939 if not rpc.call_blockdev_remove(src_node, dev):
4940 logger.Error("could not remove snapshot block device %s from"
4941 " node %s" % (dev.logical_id[1], src_node))
4943 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4944 logger.Error("could not finalize export for instance %s on node %s" %
4945 (instance.name, dst_node.name))
4947 nodelist = self.cfg.GetNodeList()
4948 nodelist.remove(dst_node.name)
4950 # on one-node clusters nodelist will be empty after the removal
4951 # if we proceed the backup would be removed because OpQueryExports
4952 # substitutes an empty list with the full cluster node list.
4954 op = opcodes.OpQueryExports(nodes=nodelist)
4955 exportlist = self.proc.ChainOpCode(op)
4956 for node in exportlist:
4957 if instance.name in exportlist[node]:
4958 if not rpc.call_export_remove(node, instance.name):
4959 logger.Error("could not remove older export for instance %s"
4960 " on node %s" % (instance.name, node))
4963 class LURemoveExport(NoHooksLU):
4964 """Remove exports related to the named instance.
4967 _OP_REQP = ["instance_name"]
4969 def CheckPrereq(self):
4970 """Check prerequisites.
4974 def Exec(self, feedback_fn):
4975 """Remove any export.
4978 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4979 # If the instance was not found we'll try with the name that was passed in.
4980 # This will only work if it was an FQDN, though.
4982 if not instance_name:
4984 instance_name = self.op.instance_name
4986 op = opcodes.OpQueryExports(nodes=[])
4987 exportlist = self.proc.ChainOpCode(op)
4989 for node in exportlist:
4990 if instance_name in exportlist[node]:
4992 if not rpc.call_export_remove(node, instance_name):
4993 logger.Error("could not remove export for instance %s"
4994 " on node %s" % (instance_name, node))
4996 if fqdn_warn and not found:
4997 feedback_fn("Export not found. If trying to remove an export belonging"
4998 " to a deleted instance please use its Fully Qualified"
5002 class TagsLU(NoHooksLU):
5005 This is an abstract class which is the parent of all the other tags LUs.
5008 def CheckPrereq(self):
5009 """Check prerequisites.
5012 if self.op.kind == constants.TAG_CLUSTER:
5013 self.target = self.cfg.GetClusterInfo()
5014 elif self.op.kind == constants.TAG_NODE:
5015 name = self.cfg.ExpandNodeName(self.op.name)
5017 raise errors.OpPrereqError("Invalid node name (%s)" %
5020 self.target = self.cfg.GetNodeInfo(name)
5021 elif self.op.kind == constants.TAG_INSTANCE:
5022 name = self.cfg.ExpandInstanceName(self.op.name)
5024 raise errors.OpPrereqError("Invalid instance name (%s)" %
5027 self.target = self.cfg.GetInstanceInfo(name)
5029 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5033 class LUGetTags(TagsLU):
5034 """Returns the tags of a given object.
5037 _OP_REQP = ["kind", "name"]
5039 def Exec(self, feedback_fn):
5040 """Returns the tag list.
5043 return self.target.GetTags()
5046 class LUSearchTags(NoHooksLU):
5047 """Searches the tags for a given pattern.
5050 _OP_REQP = ["pattern"]
5052 def CheckPrereq(self):
5053 """Check prerequisites.
5055 This checks the pattern passed for validity by compiling it.
5059 self.re = re.compile(self.op.pattern)
5060 except re.error, err:
5061 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5062 (self.op.pattern, err))
5064 def Exec(self, feedback_fn):
5065 """Returns the tag list.
5069 tgts = [("/cluster", cfg.GetClusterInfo())]
5070 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5071 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5072 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5073 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5075 for path, target in tgts:
5076 for tag in target.GetTags():
5077 if self.re.search(tag):
5078 results.append((path, tag))
5082 class LUAddTags(TagsLU):
5083 """Sets a tag on a given object.
5086 _OP_REQP = ["kind", "name", "tags"]
5088 def CheckPrereq(self):
5089 """Check prerequisites.
5091 This checks the type and length of the tag name and value.
5094 TagsLU.CheckPrereq(self)
5095 for tag in self.op.tags:
5096 objects.TaggableObject.ValidateTag(tag)
5098 def Exec(self, feedback_fn):
5103 for tag in self.op.tags:
5104 self.target.AddTag(tag)
5105 except errors.TagError, err:
5106 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5108 self.cfg.Update(self.target)
5109 except errors.ConfigurationError:
5110 raise errors.OpRetryError("There has been a modification to the"
5111 " config file and the operation has been"
5112 " aborted. Please retry.")
5115 class LUDelTags(TagsLU):
5116 """Delete a list of tags from a given object.
5119 _OP_REQP = ["kind", "name", "tags"]
5121 def CheckPrereq(self):
5122 """Check prerequisites.
5124 This checks that we have the given tag.
5127 TagsLU.CheckPrereq(self)
5128 for tag in self.op.tags:
5129 objects.TaggableObject.ValidateTag(tag)
5130 del_tags = frozenset(self.op.tags)
5131 cur_tags = self.target.GetTags()
5132 if not del_tags <= cur_tags:
5133 diff_tags = del_tags - cur_tags
5134 diff_names = ["'%s'" % tag for tag in diff_tags]
5136 raise errors.OpPrereqError("Tag(s) %s not found" %
5137 (",".join(diff_names)))
5139 def Exec(self, feedback_fn):
5140 """Remove the tag from the object.
5143 for tag in self.op.tags:
5144 self.target.RemoveTag(tag)
5146 self.cfg.Update(self.target)
5147 except errors.ConfigurationError:
5148 raise errors.OpRetryError("There has been a modification to the"
5149 " config file and the operation has been"
5150 " aborted. Please retry.")
5152 class LUTestDelay(NoHooksLU):
5153 """Sleep for a specified amount of time.
5155 This LU sleeps on the master and/or nodes for a specified amoutn of
5159 _OP_REQP = ["duration", "on_master", "on_nodes"]
5161 def CheckPrereq(self):
5162 """Check prerequisites.
5164 This checks that we have a good list of nodes and/or the duration
5169 if self.op.on_nodes:
5170 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5172 def Exec(self, feedback_fn):
5173 """Do the actual sleep.
5176 if self.op.on_master:
5177 if not utils.TestDelay(self.op.duration):
5178 raise errors.OpExecError("Error during master delay test")
5179 if self.op.on_nodes:
5180 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5182 raise errors.OpExecError("Complete failure from rpc call")
5183 for node, node_result in result.items():
5185 raise errors.OpExecError("Failure during rpc call to node %s,"
5186 " result: %s" % (node, node_result))
5189 class IAllocator(object):
5190 """IAllocator framework.
5192 An IAllocator instance has three sets of attributes:
5193 - cfg/sstore that are needed to query the cluster
5194 - input data (all members of the _KEYS class attribute are required)
5195 - four buffer attributes (in|out_data|text), that represent the
5196 input (to the external script) in text and data structure format,
5197 and the output from it, again in two formats
5198 - the result variables from the script (success, info, nodes) for
5203 "mem_size", "disks", "disk_template",
5204 "os", "tags", "nics", "vcpus",
5210 def __init__(self, cfg, sstore, mode, name, **kwargs):
5212 self.sstore = sstore
5213 # init buffer variables
5214 self.in_text = self.out_text = self.in_data = self.out_data = None
5215 # init all input fields so that pylint is happy
5218 self.mem_size = self.disks = self.disk_template = None
5219 self.os = self.tags = self.nics = self.vcpus = None
5220 self.relocate_from = None
5222 self.required_nodes = None
5223 # init result fields
5224 self.success = self.info = self.nodes = None
5225 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5226 keyset = self._ALLO_KEYS
5227 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5228 keyset = self._RELO_KEYS
5230 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5231 " IAllocator" % self.mode)
5233 if key not in keyset:
5234 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5235 " IAllocator" % key)
5236 setattr(self, key, kwargs[key])
5238 if key not in kwargs:
5239 raise errors.ProgrammerError("Missing input parameter '%s' to"
5240 " IAllocator" % key)
5241 self._BuildInputData()
5243 def _ComputeClusterData(self):
5244 """Compute the generic allocator input data.
5246 This is the data that is independent of the actual operation.
5253 "cluster_name": self.sstore.GetClusterName(),
5254 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5255 "hypervisor_type": self.sstore.GetHypervisorType(),
5256 # we don't have job IDs
5259 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5263 node_list = cfg.GetNodeList()
5264 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5265 for nname in node_list:
5266 ninfo = cfg.GetNodeInfo(nname)
5267 if nname not in node_data or not isinstance(node_data[nname], dict):
5268 raise errors.OpExecError("Can't get data for node %s" % nname)
5269 remote_info = node_data[nname]
5270 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5271 'vg_size', 'vg_free', 'cpu_total']:
5272 if attr not in remote_info:
5273 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5276 remote_info[attr] = int(remote_info[attr])
5277 except ValueError, err:
5278 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5279 " %s" % (nname, attr, str(err)))
5280 # compute memory used by primary instances
5281 i_p_mem = i_p_up_mem = 0
5282 for iinfo in i_list:
5283 if iinfo.primary_node == nname:
5284 i_p_mem += iinfo.memory
5285 if iinfo.status == "up":
5286 i_p_up_mem += iinfo.memory
5288 # compute memory used by instances
5290 "tags": list(ninfo.GetTags()),
5291 "total_memory": remote_info['memory_total'],
5292 "reserved_memory": remote_info['memory_dom0'],
5293 "free_memory": remote_info['memory_free'],
5294 "i_pri_memory": i_p_mem,
5295 "i_pri_up_memory": i_p_up_mem,
5296 "total_disk": remote_info['vg_size'],
5297 "free_disk": remote_info['vg_free'],
5298 "primary_ip": ninfo.primary_ip,
5299 "secondary_ip": ninfo.secondary_ip,
5300 "total_cpus": remote_info['cpu_total'],
5302 node_results[nname] = pnr
5303 data["nodes"] = node_results
5307 for iinfo in i_list:
5308 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5309 for n in iinfo.nics]
5311 "tags": list(iinfo.GetTags()),
5312 "should_run": iinfo.status == "up",
5313 "vcpus": iinfo.vcpus,
5314 "memory": iinfo.memory,
5316 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5318 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5319 "disk_template": iinfo.disk_template,
5321 instance_data[iinfo.name] = pir
5323 data["instances"] = instance_data
5327 def _AddNewInstance(self):
5328 """Add new instance data to allocator structure.
5330 This in combination with _AllocatorGetClusterData will create the
5331 correct structure needed as input for the allocator.
5333 The checks for the completeness of the opcode must have already been
5338 if len(self.disks) != 2:
5339 raise errors.OpExecError("Only two-disk configurations supported")
5341 disk_space = _ComputeDiskSize(self.disk_template,
5342 self.disks[0]["size"], self.disks[1]["size"])
5344 if self.disk_template in constants.DTS_NET_MIRROR:
5345 self.required_nodes = 2
5347 self.required_nodes = 1
5351 "disk_template": self.disk_template,
5354 "vcpus": self.vcpus,
5355 "memory": self.mem_size,
5356 "disks": self.disks,
5357 "disk_space_total": disk_space,
5359 "required_nodes": self.required_nodes,
5361 data["request"] = request
5363 def _AddRelocateInstance(self):
5364 """Add relocate instance data to allocator structure.
5366 This in combination with _IAllocatorGetClusterData will create the
5367 correct structure needed as input for the allocator.
5369 The checks for the completeness of the opcode must have already been
5373 instance = self.cfg.GetInstanceInfo(self.name)
5374 if instance is None:
5375 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5376 " IAllocator" % self.name)
5378 if instance.disk_template not in constants.DTS_NET_MIRROR:
5379 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5381 if len(instance.secondary_nodes) != 1:
5382 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5384 self.required_nodes = 1
5386 disk_space = _ComputeDiskSize(instance.disk_template,
5387 instance.disks[0].size,
5388 instance.disks[1].size)
5393 "disk_space_total": disk_space,
5394 "required_nodes": self.required_nodes,
5395 "relocate_from": self.relocate_from,
5397 self.in_data["request"] = request
5399 def _BuildInputData(self):
5400 """Build input data structures.
5403 self._ComputeClusterData()
5405 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5406 self._AddNewInstance()
5408 self._AddRelocateInstance()
5410 self.in_text = serializer.Dump(self.in_data)
5412 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5413 """Run an instance allocator and return the results.
5418 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5420 if not isinstance(result, tuple) or len(result) != 4:
5421 raise errors.OpExecError("Invalid result from master iallocator runner")
5423 rcode, stdout, stderr, fail = result
5425 if rcode == constants.IARUN_NOTFOUND:
5426 raise errors.OpExecError("Can't find allocator '%s'" % name)
5427 elif rcode == constants.IARUN_FAILURE:
5428 raise errors.OpExecError("Instance allocator call failed: %s,"
5430 (fail, stdout+stderr))
5431 self.out_text = stdout
5433 self._ValidateResult()
5435 def _ValidateResult(self):
5436 """Process the allocator results.
5438 This will process and if successful save the result in
5439 self.out_data and the other parameters.
5443 rdict = serializer.Load(self.out_text)
5444 except Exception, err:
5445 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5447 if not isinstance(rdict, dict):
5448 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5450 for key in "success", "info", "nodes":
5451 if key not in rdict:
5452 raise errors.OpExecError("Can't parse iallocator results:"
5453 " missing key '%s'" % key)
5454 setattr(self, key, rdict[key])
5456 if not isinstance(rdict["nodes"], list):
5457 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5459 self.out_data = rdict
5462 class LUTestAllocator(NoHooksLU):
5463 """Run allocator tests.
5465 This LU runs the allocator tests
5468 _OP_REQP = ["direction", "mode", "name"]
5470 def CheckPrereq(self):
5471 """Check prerequisites.
5473 This checks the opcode parameters depending on the director and mode test.
5476 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5477 for attr in ["name", "mem_size", "disks", "disk_template",
5478 "os", "tags", "nics", "vcpus"]:
5479 if not hasattr(self.op, attr):
5480 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5482 iname = self.cfg.ExpandInstanceName(self.op.name)
5483 if iname is not None:
5484 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5486 if not isinstance(self.op.nics, list):
5487 raise errors.OpPrereqError("Invalid parameter 'nics'")
5488 for row in self.op.nics:
5489 if (not isinstance(row, dict) or
5492 "bridge" not in row):
5493 raise errors.OpPrereqError("Invalid contents of the"
5494 " 'nics' parameter")
5495 if not isinstance(self.op.disks, list):
5496 raise errors.OpPrereqError("Invalid parameter 'disks'")
5497 if len(self.op.disks) != 2:
5498 raise errors.OpPrereqError("Only two-disk configurations supported")
5499 for row in self.op.disks:
5500 if (not isinstance(row, dict) or
5501 "size" not in row or
5502 not isinstance(row["size"], int) or
5503 "mode" not in row or
5504 row["mode"] not in ['r', 'w']):
5505 raise errors.OpPrereqError("Invalid contents of the"
5506 " 'disks' parameter")
5507 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5508 if not hasattr(self.op, "name"):
5509 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5510 fname = self.cfg.ExpandInstanceName(self.op.name)
5512 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5514 self.op.name = fname
5515 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5517 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5520 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5521 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5522 raise errors.OpPrereqError("Missing allocator name")
5523 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5524 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5527 def Exec(self, feedback_fn):
5528 """Run the allocator test.
5531 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5532 ial = IAllocator(self.cfg, self.sstore,
5535 mem_size=self.op.mem_size,
5536 disks=self.op.disks,
5537 disk_template=self.op.disk_template,
5541 vcpus=self.op.vcpus,
5544 ial = IAllocator(self.cfg, self.sstore,
5547 relocate_from=list(self.relocate_from),
5550 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5551 result = ial.in_text
5553 ial.Run(self.op.allocator, validate=False)
5554 result = ial.out_text