4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 No nodes should be returned as an empty list (and not None).
138 Note that if the HPATH for a LU class is None, this function will
142 raise NotImplementedError
144 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145 """Notify the LU about the results of its hooks.
147 This method is called every time a hooks phase is executed, and notifies
148 the Logical Unit about the hooks' result. The LU can then use it to alter
149 its result based on the hooks. By default the method does nothing and the
150 previous result is passed back unchanged but any LU can define it if it
151 wants to use the local cluster hook-scripts somehow.
154 phase: the hooks phase that has just been run
155 hooks_results: the results of the multi-node hooks rpc call
156 feedback_fn: function to send feedback back to the caller
157 lu_result: the previous result this LU had, or None in the PRE phase.
163 class NoHooksLU(LogicalUnit):
164 """Simple LU which runs no hooks.
166 This LU is intended as a parent for other LogicalUnits which will
167 run no hooks, in order to reduce duplicate code.
174 def _AddHostToEtcHosts(hostname):
175 """Wrapper around utils.SetEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182 def _RemoveHostFromEtcHosts(hostname):
183 """Wrapper around utils.RemoveEtcHostsEntry.
186 hi = utils.HostInfo(name=hostname)
187 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191 def _GetWantedNodes(lu, nodes):
192 """Returns list of checked and expanded node names.
195 nodes: List of nodes (strings) or None for all
198 if not isinstance(nodes, list):
199 raise errors.OpPrereqError("Invalid argument type 'nodes'")
205 node = lu.cfg.ExpandNodeName(name)
207 raise errors.OpPrereqError("No such node name '%s'" % name)
211 wanted = lu.cfg.GetNodeList()
212 return utils.NiceSort(wanted)
215 def _GetWantedInstances(lu, instances):
216 """Returns list of checked and expanded instance names.
219 instances: List of instances (strings) or None for all
222 if not isinstance(instances, list):
223 raise errors.OpPrereqError("Invalid argument type 'instances'")
228 for name in instances:
229 instance = lu.cfg.ExpandInstanceName(name)
231 raise errors.OpPrereqError("No such instance name '%s'" % name)
232 wanted.append(instance)
235 wanted = lu.cfg.GetInstanceList()
236 return utils.NiceSort(wanted)
239 def _CheckOutputFields(static, dynamic, selected):
240 """Checks whether all selected fields are valid.
243 static: Static fields
244 dynamic: Dynamic fields
247 static_fields = frozenset(static)
248 dynamic_fields = frozenset(dynamic)
250 all_fields = static_fields | dynamic_fields
252 if not all_fields.issuperset(selected):
253 raise errors.OpPrereqError("Unknown output fields selected: %s"
254 % ",".join(frozenset(selected).
255 difference(all_fields)))
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259 memory, vcpus, nics):
260 """Builds instance related env variables for hooks from single variables.
263 secondary_nodes: List of secondary nodes as strings
267 "INSTANCE_NAME": name,
268 "INSTANCE_PRIMARY": primary_node,
269 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270 "INSTANCE_OS_TYPE": os_type,
271 "INSTANCE_STATUS": status,
272 "INSTANCE_MEMORY": memory,
273 "INSTANCE_VCPUS": vcpus,
277 nic_count = len(nics)
278 for idx, (ip, bridge, mac) in enumerate(nics):
281 env["INSTANCE_NIC%d_IP" % idx] = ip
282 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
287 env["INSTANCE_NIC_COUNT"] = nic_count
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293 """Builds instance related env variables for hooks from an object.
296 instance: objects.Instance object of instance
297 override: dict of values to override
300 'name': instance.name,
301 'primary_node': instance.primary_node,
302 'secondary_nodes': instance.secondary_nodes,
303 'os_type': instance.os,
304 'status': instance.os,
305 'memory': instance.memory,
306 'vcpus': instance.vcpus,
307 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310 args.update(override)
311 return _BuildInstanceHookEnv(**args)
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315 """Ensure a node has a correct known_hosts entry.
318 fullnode - Fully qualified domain name of host. (str)
319 ip - IPv4 address of host (str)
320 pubkey - the public key of the cluster
323 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
335 logger.Debug('read %s' % (repr(rawline),))
337 parts = rawline.rstrip('\r\n').split()
339 # Ignore unwanted lines
340 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341 fields = parts[0].split(',')
346 for spec in [ ip, fullnode ]:
347 if spec not in fields:
352 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353 if haveall and key == pubkey:
355 save_lines.append(rawline)
356 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359 if havesome and (not haveall or key != pubkey):
361 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364 save_lines.append(rawline)
367 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371 save_lines = save_lines + add_lines
373 # Write a new file and replace old.
374 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
376 newfile = os.fdopen(fd, 'w')
378 newfile.write(''.join(save_lines))
381 logger.Debug("Wrote new known_hosts.")
382 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385 # Simply appending a new line will do the trick.
387 for add in add_lines:
393 def _HasValidVG(vglist, vgname):
394 """Checks if the volume group list is valid.
396 A non-None return value means there's an error, and the return value
397 is the error message.
400 vgsize = vglist.get(vgname, None)
402 return "volume group '%s' missing" % vgname
404 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
409 def _InitSSHSetup(node):
410 """Setup the SSH configuration for the cluster.
413 This generates a dsa keypair for root, adds the pub key to the
414 permitted hosts and adds the hostkey to its own known hosts.
417 node: the name of this host as a fqdn
420 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
422 for name in priv_key, pub_key:
423 if os.path.exists(name):
424 utils.CreateBackup(name)
425 utils.RemoveFile(name)
427 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
431 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434 f = open(pub_key, 'r')
436 utils.AddAuthorizedKey(auth_keys, f.read(8192))
441 def _InitGanetiServerSetup(ss):
442 """Setup the necessary configuration for the initial node daemon.
444 This creates the nodepass file containing the shared password for
445 the cluster and also generates the SSL certificate.
448 # Create pseudo random password
449 randpass = sha.new(os.urandom(64)).hexdigest()
450 # and write it into sstore
451 ss.SetKey(ss.SS_NODED_PASS, randpass)
453 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454 "-days", str(365*5), "-nodes", "-x509",
455 "-keyout", constants.SSL_CERT_FILE,
456 "-out", constants.SSL_CERT_FILE, "-batch"])
458 raise errors.OpExecError("could not generate server ssl cert, command"
459 " %s had exitcode %s and error message %s" %
460 (result.cmd, result.exit_code, result.output))
462 os.chmod(constants.SSL_CERT_FILE, 0400)
464 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467 raise errors.OpExecError("Could not start the node daemon, command %s"
468 " had exitcode %s and error %s" %
469 (result.cmd, result.exit_code, result.output))
472 def _CheckInstanceBridgesExist(instance):
473 """Check that the brigdes needed by an instance exist.
476 # check bridges existance
477 brlist = [nic.bridge for nic in instance.nics]
478 if not rpc.call_bridges_exist(instance.primary_node, brlist):
479 raise errors.OpPrereqError("one or more target bridges %s does not"
480 " exist on destination node '%s'" %
481 (brlist, instance.primary_node))
484 class LUInitCluster(LogicalUnit):
485 """Initialise the cluster.
488 HPATH = "cluster-init"
489 HTYPE = constants.HTYPE_CLUSTER
490 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491 "def_bridge", "master_netdev"]
494 def BuildHooksEnv(self):
497 Notes: Since we don't require a cluster, we must manually add
498 ourselves in the post-run node list.
501 env = {"OP_TARGET": self.op.cluster_name}
502 return env, [], [self.hostname.name]
504 def CheckPrereq(self):
505 """Verify that the passed name is a valid one.
508 if config.ConfigWriter.IsCluster():
509 raise errors.OpPrereqError("Cluster is already initialised")
511 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512 if not os.path.exists(constants.VNC_PASSWORD_FILE):
513 raise errors.OpPrereqError("Please prepare the cluster VNC"
515 constants.VNC_PASSWORD_FILE)
517 self.hostname = hostname = utils.HostInfo()
519 if hostname.ip.startswith("127."):
520 raise errors.OpPrereqError("This host's IP resolves to the private"
521 " range (%s). Please fix DNS or %s." %
522 (hostname.ip, constants.ETC_HOSTS))
524 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525 source=constants.LOCALHOST_IP_ADDRESS):
526 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527 " to %s,\nbut this ip address does not"
528 " belong to this host."
529 " Aborting." % hostname.ip)
531 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
533 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
535 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
537 secondary_ip = getattr(self.op, "secondary_ip", None)
538 if secondary_ip and not utils.IsValidIP(secondary_ip):
539 raise errors.OpPrereqError("Invalid secondary ip given")
541 secondary_ip != hostname.ip and
542 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543 source=constants.LOCALHOST_IP_ADDRESS))):
544 raise errors.OpPrereqError("You gave %s as secondary IP,"
545 " but it does not belong to this host." %
547 self.secondary_ip = secondary_ip
549 # checks presence of the volume group given
550 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553 raise errors.OpPrereqError("Error: %s" % vgstatus)
555 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
557 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560 if self.op.hypervisor_type not in constants.HYPER_TYPES:
561 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562 self.op.hypervisor_type)
564 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
566 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567 (self.op.master_netdev,
568 result.output.strip()))
570 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572 raise errors.OpPrereqError("Init.d script '%s' missing or not"
573 " executable." % constants.NODE_INITD_SCRIPT)
575 def Exec(self, feedback_fn):
576 """Initialize the cluster.
579 clustername = self.clustername
580 hostname = self.hostname
582 # set up the simple store
583 self.sstore = ss = ssconf.SimpleStore()
584 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
590 # set up the inter-node password and certificate
591 _InitGanetiServerSetup(ss)
593 # start the master ip
594 rpc.call_node_start_master(hostname.name)
596 # set up ssh config and /etc/hosts
597 f = open(constants.SSH_HOST_RSA_PUB, 'r')
602 sshkey = sshline.split(" ")[1]
604 _AddHostToEtcHosts(hostname.name)
606 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
608 _InitSSHSetup(hostname.name)
610 # init of cluster config file
611 self.cfg = cfgw = config.ConfigWriter()
612 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613 sshkey, self.op.mac_prefix,
614 self.op.vg_name, self.op.def_bridge)
617 class LUDestroyCluster(NoHooksLU):
618 """Logical unit for destroying the cluster.
623 def CheckPrereq(self):
624 """Check prerequisites.
626 This checks whether the cluster is empty.
628 Any errors are signalled by raising errors.OpPrereqError.
631 master = self.sstore.GetMasterNode()
633 nodelist = self.cfg.GetNodeList()
634 if len(nodelist) != 1 or nodelist[0] != master:
635 raise errors.OpPrereqError("There are still %d node(s) in"
636 " this cluster." % (len(nodelist) - 1))
637 instancelist = self.cfg.GetInstanceList()
639 raise errors.OpPrereqError("There are still %d instance(s) in"
640 " this cluster." % len(instancelist))
642 def Exec(self, feedback_fn):
643 """Destroys the cluster.
646 master = self.sstore.GetMasterNode()
647 if not rpc.call_node_stop_master(master):
648 raise errors.OpExecError("Could not disable the master role")
649 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650 utils.CreateBackup(priv_key)
651 utils.CreateBackup(pub_key)
652 rpc.call_node_leave_cluster(master)
655 class LUVerifyCluster(LogicalUnit):
656 """Verifies the cluster status.
659 HPATH = "cluster-verify"
660 HTYPE = constants.HTYPE_CLUSTER
661 _OP_REQP = ["skip_checks"]
663 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664 remote_version, feedback_fn):
665 """Run multiple tests against a node.
668 - compares ganeti version
669 - checks vg existance and size > 20G
670 - checks config file checksum
671 - checks ssh to other nodes
674 node: name of the node to check
675 file_list: required list of files
676 local_cksum: dictionary of local files and their checksums
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 if not remote_version:
682 feedback_fn(" - ERROR: connection to %s failed" % (node))
685 if local_version != remote_version:
686 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
687 (local_version, node, remote_version))
690 # checks vg existance and size > 20G
694 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
698 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
700 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
703 # checks config file checksum
706 if 'filelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned file checksum data")
710 remote_cksum = node_result['filelist']
711 for file_name in file_list:
712 if file_name not in remote_cksum:
714 feedback_fn(" - ERROR: file '%s' missing" % file_name)
715 elif remote_cksum[file_name] != local_cksum[file_name]:
717 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
719 if 'nodelist' not in node_result:
721 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
723 if node_result['nodelist']:
725 for node in node_result['nodelist']:
726 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
727 (node, node_result['nodelist'][node]))
728 if 'node-net-test' not in node_result:
730 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
732 if node_result['node-net-test']:
734 nlist = utils.NiceSort(node_result['node-net-test'].keys())
736 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
737 (node, node_result['node-net-test'][node]))
739 hyp_result = node_result.get('hypervisor', None)
740 if hyp_result is not None:
741 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
744 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745 node_instance, feedback_fn):
746 """Verify an instance.
748 This function checks to see if the required block devices are
749 available on the instance's node.
754 node_current = instanceconfig.primary_node
757 instanceconfig.MapLVsByNode(node_vol_should)
759 for node in node_vol_should:
760 for volume in node_vol_should[node]:
761 if node not in node_vol_is or volume not in node_vol_is[node]:
762 feedback_fn(" - ERROR: volume %s missing on node %s" %
766 if not instanceconfig.status == 'down':
767 if (node_current not in node_instance or
768 not instance in node_instance[node_current]):
769 feedback_fn(" - ERROR: instance %s not running on node %s" %
770 (instance, node_current))
773 for node in node_instance:
774 if (not node == node_current):
775 if instance in node_instance[node]:
776 feedback_fn(" - ERROR: instance %s should not run on node %s" %
782 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783 """Verify if there are any unknown volumes in the cluster.
785 The .os, .swap and backup volumes are ignored. All other volumes are
791 for node in node_vol_is:
792 for volume in node_vol_is[node]:
793 if node not in node_vol_should or volume not in node_vol_should[node]:
794 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
799 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800 """Verify the list of running instances.
802 This checks what instances are running but unknown to the cluster.
806 for node in node_instance:
807 for runninginstance in node_instance[node]:
808 if runninginstance not in instancelist:
809 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
810 (runninginstance, node))
814 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815 """Verify N+1 Memory Resilience.
817 Check that if one single node dies we can still start all the instances it
823 for node, nodeinfo in node_info.iteritems():
824 # This code checks that every node which is now listed as secondary has
825 # enough memory to host all instances it is supposed to should a single
826 # other node in the cluster fail.
827 # FIXME: not ready for failover to an arbitrary node
828 # FIXME: does not support file-backed instances
829 # WARNING: we currently take into account down instances as well as up
830 # ones, considering that even if they're down someone might want to start
831 # them even in the event of a node failure.
832 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
834 for instance in instances:
835 needed_mem += instance_cfg[instance].memory
836 if nodeinfo['mfree'] < needed_mem:
837 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
838 " failovers should node %s fail" % (node, prinode))
842 def CheckPrereq(self):
843 """Check prerequisites.
845 Transform the list of checks we're going to skip into a set and check that
846 all its members are valid.
849 self.skip_set = frozenset(self.op.skip_checks)
850 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851 raise errors.OpPrereqError("Invalid checks to be skipped specified")
853 def BuildHooksEnv(self):
856 Cluster-Verify hooks just rone in the post phase and their failure makes
857 the output be logged in the verify output and the verification to fail.
860 all_nodes = self.cfg.GetNodeList()
861 tags = self.cfg.GetClusterInfo().GetTags()
862 # TODO: populate the environment with useful information for verify hooks
864 "CLUSTER_TAGS": " ".join(tags),
866 return env, [], all_nodes
868 def Exec(self, feedback_fn):
869 """Verify integrity of cluster, performing various test on nodes.
873 feedback_fn("* Verifying global settings")
874 for msg in self.cfg.VerifyConfig():
875 feedback_fn(" - ERROR: %s" % msg)
877 vg_name = self.cfg.GetVGName()
878 nodelist = utils.NiceSort(self.cfg.GetNodeList())
879 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881 i_non_redundant = [] # Non redundant instances
887 # FIXME: verify OS list
889 file_names = list(self.sstore.GetFileList())
890 file_names.append(constants.SSL_CERT_FILE)
891 file_names.append(constants.CLUSTER_CONF_FILE)
892 local_checksums = utils.FingerprintFiles(file_names)
894 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896 all_instanceinfo = rpc.call_instance_list(nodelist)
897 all_vglist = rpc.call_vg_list(nodelist)
898 node_verify_param = {
899 'filelist': file_names,
900 'nodelist': nodelist,
902 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903 for node in nodeinfo]
905 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906 all_rversion = rpc.call_version(nodelist)
907 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
909 incomplete_nodeinfo = False
911 for node in nodelist:
912 feedback_fn("* Verifying node %s" % node)
913 result = self._VerifyNode(node, file_names, local_checksums,
914 all_vglist[node], all_nvinfo[node],
915 all_rversion[node], feedback_fn)
919 volumeinfo = all_volumeinfo[node]
921 if isinstance(volumeinfo, basestring):
922 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
923 (node, volumeinfo[-400:].encode('string_escape')))
925 node_volume[node] = {}
926 elif not isinstance(volumeinfo, dict):
927 feedback_fn(" - ERROR: connection to %s failed" % (node,))
929 incomplete_nodeinfo = True
932 node_volume[node] = volumeinfo
935 nodeinstance = all_instanceinfo[node]
936 if type(nodeinstance) != list:
937 feedback_fn(" - ERROR: connection to %s failed" % (node,))
939 incomplete_nodeinfo = True
942 node_instance[node] = nodeinstance
945 nodeinfo = all_ninfo[node]
946 if not isinstance(nodeinfo, dict):
947 feedback_fn(" - ERROR: connection to %s failed" % (node,))
949 incomplete_nodeinfo = True
954 "mfree": int(nodeinfo['memory_free']),
955 "dfree": int(nodeinfo['vg_free']),
958 # dictionary holding all instances this node is secondary for,
959 # grouped by their primary node. Each key is a cluster node, and each
960 # value is a list of instances which have the key as primary and the
961 # current node as secondary. this is handy to calculate N+1 memory
962 # availability if you can only failover from a primary to its
964 "sinst-by-pnode": {},
966 except (ValueError, TypeError):
967 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
969 incomplete_nodeinfo = True
974 for instance in instancelist:
975 feedback_fn("* Verifying instance %s" % instance)
976 inst_config = self.cfg.GetInstanceInfo(instance)
977 result = self._VerifyInstance(instance, inst_config, node_volume,
978 node_instance, feedback_fn)
981 inst_config.MapLVsByNode(node_vol_should)
983 instance_cfg[instance] = inst_config
985 pnode = inst_config.primary_node
986 if pnode in node_info:
987 node_info[pnode]['pinst'].append(instance)
989 feedback_fn(" - ERROR: instance %s, connection to primary node"
990 " %s failed" % (instance, pnode))
993 # If the instance is non-redundant we cannot survive losing its primary
994 # node, so we are not N+1 compliant. On the other hand we have no disk
995 # templates with more than one secondary so that situation is not well
997 # FIXME: does not support file-backed instances
998 if len(inst_config.secondary_nodes) == 0:
999 i_non_redundant.append(instance)
1000 elif len(inst_config.secondary_nodes) > 1:
1001 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1004 for snode in inst_config.secondary_nodes:
1005 if snode in node_info:
1006 node_info[snode]['sinst'].append(instance)
1007 if pnode not in node_info[snode]['sinst-by-pnode']:
1008 node_info[snode]['sinst-by-pnode'][pnode] = []
1009 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1011 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1012 " %s failed" % (instance, snode))
1014 feedback_fn("* Verifying orphan volumes")
1015 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1019 feedback_fn("* Verifying remaining instances")
1020 result = self._VerifyOrphanInstances(instancelist, node_instance,
1024 if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1025 not incomplete_nodeinfo):
1026 feedback_fn("* Verifying N+1 Memory redundancy")
1027 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1030 feedback_fn("* Other Notes")
1032 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1033 % len(i_non_redundant))
1037 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1038 """Analize the post-hooks' result, handle it, and send some
1039 nicely-formatted feedback back to the user.
1042 phase: the hooks phase that has just been run
1043 hooks_results: the results of the multi-node hooks rpc call
1044 feedback_fn: function to send feedback back to the caller
1045 lu_result: previous Exec result
1048 # We only really run POST phase hooks, and are only interested in their results
1049 if phase == constants.HOOKS_PHASE_POST:
1050 # Used to change hooks' output to proper indentation
1051 indent_re = re.compile('^', re.M)
1052 feedback_fn("* Hooks Results")
1053 if not hooks_results:
1054 feedback_fn(" - ERROR: general communication failure")
1057 for node_name in hooks_results:
1058 show_node_header = True
1059 res = hooks_results[node_name]
1060 if res is False or not isinstance(res, list):
1061 feedback_fn(" Communication failure")
1064 for script, hkr, output in res:
1065 if hkr == constants.HKR_FAIL:
1066 # The node header is only shown once, if there are
1067 # failing hooks on that node
1068 if show_node_header:
1069 feedback_fn(" Node %s:" % node_name)
1070 show_node_header = False
1071 feedback_fn(" ERROR: Script %s failed, output:" % script)
1072 output = indent_re.sub(' ', output)
1073 feedback_fn("%s" % output)
1079 class LUVerifyDisks(NoHooksLU):
1080 """Verifies the cluster disks status.
1085 def CheckPrereq(self):
1086 """Check prerequisites.
1088 This has no prerequisites.
1093 def Exec(self, feedback_fn):
1094 """Verify integrity of cluster disks.
1097 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1099 vg_name = self.cfg.GetVGName()
1100 nodes = utils.NiceSort(self.cfg.GetNodeList())
1101 instances = [self.cfg.GetInstanceInfo(name)
1102 for name in self.cfg.GetInstanceList()]
1105 for inst in instances:
1107 if (inst.status != "up" or
1108 inst.disk_template not in constants.DTS_NET_MIRROR):
1110 inst.MapLVsByNode(inst_lvs)
1111 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1112 for node, vol_list in inst_lvs.iteritems():
1113 for vol in vol_list:
1114 nv_dict[(node, vol)] = inst
1119 node_lvs = rpc.call_volume_list(nodes, vg_name)
1124 lvs = node_lvs[node]
1126 if isinstance(lvs, basestring):
1127 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1128 res_nlvm[node] = lvs
1129 elif not isinstance(lvs, dict):
1130 logger.Info("connection to node %s failed or invalid data returned" %
1132 res_nodes.append(node)
1135 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1136 inst = nv_dict.pop((node, lv_name), None)
1137 if (not lv_online and inst is not None
1138 and inst.name not in res_instances):
1139 res_instances.append(inst.name)
1141 # any leftover items in nv_dict are missing LVs, let's arrange the
1143 for key, inst in nv_dict.iteritems():
1144 if inst.name not in res_missing:
1145 res_missing[inst.name] = []
1146 res_missing[inst.name].append(key)
1151 class LURenameCluster(LogicalUnit):
1152 """Rename the cluster.
1155 HPATH = "cluster-rename"
1156 HTYPE = constants.HTYPE_CLUSTER
1159 def BuildHooksEnv(self):
1164 "OP_TARGET": self.sstore.GetClusterName(),
1165 "NEW_NAME": self.op.name,
1167 mn = self.sstore.GetMasterNode()
1168 return env, [mn], [mn]
1170 def CheckPrereq(self):
1171 """Verify that the passed name is a valid one.
1174 hostname = utils.HostInfo(self.op.name)
1176 new_name = hostname.name
1177 self.ip = new_ip = hostname.ip
1178 old_name = self.sstore.GetClusterName()
1179 old_ip = self.sstore.GetMasterIP()
1180 if new_name == old_name and new_ip == old_ip:
1181 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1182 " cluster has changed")
1183 if new_ip != old_ip:
1184 result = utils.RunCmd(["fping", "-q", new_ip])
1185 if not result.failed:
1186 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1187 " reachable on the network. Aborting." %
1190 self.op.name = new_name
1192 def Exec(self, feedback_fn):
1193 """Rename the cluster.
1196 clustername = self.op.name
1200 # shutdown the master IP
1201 master = ss.GetMasterNode()
1202 if not rpc.call_node_stop_master(master):
1203 raise errors.OpExecError("Could not disable the master role")
1207 ss.SetKey(ss.SS_MASTER_IP, ip)
1208 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1210 # Distribute updated ss config to all nodes
1211 myself = self.cfg.GetNodeInfo(master)
1212 dist_nodes = self.cfg.GetNodeList()
1213 if myself.name in dist_nodes:
1214 dist_nodes.remove(myself.name)
1216 logger.Debug("Copying updated ssconf data to all nodes")
1217 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1218 fname = ss.KeyToFilename(keyname)
1219 result = rpc.call_upload_file(dist_nodes, fname)
1220 for to_node in dist_nodes:
1221 if not result[to_node]:
1222 logger.Error("copy of file %s to node %s failed" %
1225 if not rpc.call_node_start_master(master):
1226 logger.Error("Could not re-enable the master role on the master,"
1227 " please restart manually.")
1230 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1231 """Sleep and poll for an instance's disk to sync.
1234 if not instance.disks:
1238 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1240 node = instance.primary_node
1242 for dev in instance.disks:
1243 cfgw.SetDiskID(dev, node)
1249 cumul_degraded = False
1250 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1252 proc.LogWarning("Can't get any data from node %s" % node)
1255 raise errors.RemoteError("Can't contact node %s for mirror data,"
1256 " aborting." % node)
1260 for i in range(len(rstats)):
1263 proc.LogWarning("Can't compute data for node %s/%s" %
1264 (node, instance.disks[i].iv_name))
1266 # we ignore the ldisk parameter
1267 perc_done, est_time, is_degraded, _ = mstat
1268 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1269 if perc_done is not None:
1271 if est_time is not None:
1272 rem_time = "%d estimated seconds remaining" % est_time
1275 rem_time = "no time estimate"
1276 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1277 (instance.disks[i].iv_name, perc_done, rem_time))
1284 time.sleep(min(60, max_time))
1290 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1291 return not cumul_degraded
1294 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1295 """Check that mirrors are not degraded.
1297 The ldisk parameter, if True, will change the test from the
1298 is_degraded attribute (which represents overall non-ok status for
1299 the device(s)) to the ldisk (representing the local storage status).
1302 cfgw.SetDiskID(dev, node)
1309 if on_primary or dev.AssembleOnSecondary():
1310 rstats = rpc.call_blockdev_find(node, dev)
1312 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1315 result = result and (not rstats[idx])
1317 for child in dev.children:
1318 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1323 class LUDiagnoseOS(NoHooksLU):
1324 """Logical unit for OS diagnose/query.
1327 _OP_REQP = ["output_fields", "names"]
1329 def CheckPrereq(self):
1330 """Check prerequisites.
1332 This always succeeds, since this is a pure query LU.
1336 raise errors.OpPrereqError("Selective OS query not supported")
1338 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1339 _CheckOutputFields(static=[],
1340 dynamic=self.dynamic_fields,
1341 selected=self.op.output_fields)
1344 def _DiagnoseByOS(node_list, rlist):
1345 """Remaps a per-node return list into an a per-os per-node dictionary
1348 node_list: a list with the names of all nodes
1349 rlist: a map with node names as keys and OS objects as values
1352 map: a map with osnames as keys and as value another map, with
1354 keys and list of OS objects as values
1355 e.g. {"debian-etch": {"node1": [<object>,...],
1356 "node2": [<object>,]}
1361 for node_name, nr in rlist.iteritems():
1365 if os.name not in all_os:
1366 # build a list of nodes for this os containing empty lists
1367 # for each node in node_list
1368 all_os[os.name] = {}
1369 for nname in node_list:
1370 all_os[os.name][nname] = []
1371 all_os[os.name][node_name].append(os)
1374 def Exec(self, feedback_fn):
1375 """Compute the list of OSes.
1378 node_list = self.cfg.GetNodeList()
1379 node_data = rpc.call_os_diagnose(node_list)
1380 if node_data == False:
1381 raise errors.OpExecError("Can't gather the list of OSes")
1382 pol = self._DiagnoseByOS(node_list, node_data)
1384 for os_name, os_data in pol.iteritems():
1386 for field in self.op.output_fields:
1389 elif field == "valid":
1390 val = utils.all([osl and osl[0] for osl in os_data.values()])
1391 elif field == "node_status":
1393 for node_name, nos_list in os_data.iteritems():
1394 val[node_name] = [(v.status, v.path) for v in nos_list]
1396 raise errors.ParameterError(field)
1403 class LURemoveNode(LogicalUnit):
1404 """Logical unit for removing a node.
1407 HPATH = "node-remove"
1408 HTYPE = constants.HTYPE_NODE
1409 _OP_REQP = ["node_name"]
1411 def BuildHooksEnv(self):
1414 This doesn't run on the target node in the pre phase as a failed
1415 node would not allows itself to run.
1419 "OP_TARGET": self.op.node_name,
1420 "NODE_NAME": self.op.node_name,
1422 all_nodes = self.cfg.GetNodeList()
1423 all_nodes.remove(self.op.node_name)
1424 return env, all_nodes, all_nodes
1426 def CheckPrereq(self):
1427 """Check prerequisites.
1430 - the node exists in the configuration
1431 - it does not have primary or secondary instances
1432 - it's not the master
1434 Any errors are signalled by raising errors.OpPrereqError.
1437 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1439 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1441 instance_list = self.cfg.GetInstanceList()
1443 masternode = self.sstore.GetMasterNode()
1444 if node.name == masternode:
1445 raise errors.OpPrereqError("Node is the master node,"
1446 " you need to failover first.")
1448 for instance_name in instance_list:
1449 instance = self.cfg.GetInstanceInfo(instance_name)
1450 if node.name == instance.primary_node:
1451 raise errors.OpPrereqError("Instance %s still running on the node,"
1452 " please remove first." % instance_name)
1453 if node.name in instance.secondary_nodes:
1454 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1455 " please remove first." % instance_name)
1456 self.op.node_name = node.name
1459 def Exec(self, feedback_fn):
1460 """Removes the node from the cluster.
1464 logger.Info("stopping the node daemon and removing configs from node %s" %
1467 rpc.call_node_leave_cluster(node.name)
1469 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1471 logger.Info("Removing node %s from config" % node.name)
1473 self.cfg.RemoveNode(node.name)
1475 _RemoveHostFromEtcHosts(node.name)
1478 class LUQueryNodes(NoHooksLU):
1479 """Logical unit for querying nodes.
1482 _OP_REQP = ["output_fields", "names"]
1484 def CheckPrereq(self):
1485 """Check prerequisites.
1487 This checks that the fields required are valid output fields.
1490 self.dynamic_fields = frozenset([
1492 "mtotal", "mnode", "mfree",
1497 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1498 "pinst_list", "sinst_list",
1499 "pip", "sip", "tags"],
1500 dynamic=self.dynamic_fields,
1501 selected=self.op.output_fields)
1503 self.wanted = _GetWantedNodes(self, self.op.names)
1505 def Exec(self, feedback_fn):
1506 """Computes the list of nodes and their attributes.
1509 nodenames = self.wanted
1510 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1512 # begin data gathering
1514 if self.dynamic_fields.intersection(self.op.output_fields):
1516 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1517 for name in nodenames:
1518 nodeinfo = node_data.get(name, None)
1521 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1522 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1523 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1524 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1525 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1526 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1527 "bootid": nodeinfo['bootid'],
1530 live_data[name] = {}
1532 live_data = dict.fromkeys(nodenames, {})
1534 node_to_primary = dict([(name, set()) for name in nodenames])
1535 node_to_secondary = dict([(name, set()) for name in nodenames])
1537 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1538 "sinst_cnt", "sinst_list"))
1539 if inst_fields & frozenset(self.op.output_fields):
1540 instancelist = self.cfg.GetInstanceList()
1542 for instance_name in instancelist:
1543 inst = self.cfg.GetInstanceInfo(instance_name)
1544 if inst.primary_node in node_to_primary:
1545 node_to_primary[inst.primary_node].add(inst.name)
1546 for secnode in inst.secondary_nodes:
1547 if secnode in node_to_secondary:
1548 node_to_secondary[secnode].add(inst.name)
1550 # end data gathering
1553 for node in nodelist:
1555 for field in self.op.output_fields:
1558 elif field == "pinst_list":
1559 val = list(node_to_primary[node.name])
1560 elif field == "sinst_list":
1561 val = list(node_to_secondary[node.name])
1562 elif field == "pinst_cnt":
1563 val = len(node_to_primary[node.name])
1564 elif field == "sinst_cnt":
1565 val = len(node_to_secondary[node.name])
1566 elif field == "pip":
1567 val = node.primary_ip
1568 elif field == "sip":
1569 val = node.secondary_ip
1570 elif field == "tags":
1571 val = list(node.GetTags())
1572 elif field in self.dynamic_fields:
1573 val = live_data[node.name].get(field, None)
1575 raise errors.ParameterError(field)
1576 node_output.append(val)
1577 output.append(node_output)
1582 class LUQueryNodeVolumes(NoHooksLU):
1583 """Logical unit for getting volumes on node(s).
1586 _OP_REQP = ["nodes", "output_fields"]
1588 def CheckPrereq(self):
1589 """Check prerequisites.
1591 This checks that the fields required are valid output fields.
1594 self.nodes = _GetWantedNodes(self, self.op.nodes)
1596 _CheckOutputFields(static=["node"],
1597 dynamic=["phys", "vg", "name", "size", "instance"],
1598 selected=self.op.output_fields)
1601 def Exec(self, feedback_fn):
1602 """Computes the list of nodes and their attributes.
1605 nodenames = self.nodes
1606 volumes = rpc.call_node_volumes(nodenames)
1608 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1609 in self.cfg.GetInstanceList()]
1611 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1614 for node in nodenames:
1615 if node not in volumes or not volumes[node]:
1618 node_vols = volumes[node][:]
1619 node_vols.sort(key=lambda vol: vol['dev'])
1621 for vol in node_vols:
1623 for field in self.op.output_fields:
1626 elif field == "phys":
1630 elif field == "name":
1632 elif field == "size":
1633 val = int(float(vol['size']))
1634 elif field == "instance":
1636 if node not in lv_by_node[inst]:
1638 if vol['name'] in lv_by_node[inst][node]:
1644 raise errors.ParameterError(field)
1645 node_output.append(str(val))
1647 output.append(node_output)
1652 class LUAddNode(LogicalUnit):
1653 """Logical unit for adding node to the cluster.
1657 HTYPE = constants.HTYPE_NODE
1658 _OP_REQP = ["node_name"]
1660 def BuildHooksEnv(self):
1663 This will run on all nodes before, and on all nodes + the new node after.
1667 "OP_TARGET": self.op.node_name,
1668 "NODE_NAME": self.op.node_name,
1669 "NODE_PIP": self.op.primary_ip,
1670 "NODE_SIP": self.op.secondary_ip,
1672 nodes_0 = self.cfg.GetNodeList()
1673 nodes_1 = nodes_0 + [self.op.node_name, ]
1674 return env, nodes_0, nodes_1
1676 def CheckPrereq(self):
1677 """Check prerequisites.
1680 - the new node is not already in the config
1682 - its parameters (single/dual homed) matches the cluster
1684 Any errors are signalled by raising errors.OpPrereqError.
1687 node_name = self.op.node_name
1690 dns_data = utils.HostInfo(node_name)
1692 node = dns_data.name
1693 primary_ip = self.op.primary_ip = dns_data.ip
1694 secondary_ip = getattr(self.op, "secondary_ip", None)
1695 if secondary_ip is None:
1696 secondary_ip = primary_ip
1697 if not utils.IsValidIP(secondary_ip):
1698 raise errors.OpPrereqError("Invalid secondary IP given")
1699 self.op.secondary_ip = secondary_ip
1701 node_list = cfg.GetNodeList()
1702 if not self.op.readd and node in node_list:
1703 raise errors.OpPrereqError("Node %s is already in the configuration" %
1705 elif self.op.readd and node not in node_list:
1706 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1708 for existing_node_name in node_list:
1709 existing_node = cfg.GetNodeInfo(existing_node_name)
1711 if self.op.readd and node == existing_node_name:
1712 if (existing_node.primary_ip != primary_ip or
1713 existing_node.secondary_ip != secondary_ip):
1714 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1715 " address configuration as before")
1718 if (existing_node.primary_ip == primary_ip or
1719 existing_node.secondary_ip == primary_ip or
1720 existing_node.primary_ip == secondary_ip or
1721 existing_node.secondary_ip == secondary_ip):
1722 raise errors.OpPrereqError("New node ip address(es) conflict with"
1723 " existing node %s" % existing_node.name)
1725 # check that the type of the node (single versus dual homed) is the
1726 # same as for the master
1727 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1728 master_singlehomed = myself.secondary_ip == myself.primary_ip
1729 newbie_singlehomed = secondary_ip == primary_ip
1730 if master_singlehomed != newbie_singlehomed:
1731 if master_singlehomed:
1732 raise errors.OpPrereqError("The master has no private ip but the"
1733 " new node has one")
1735 raise errors.OpPrereqError("The master has a private ip but the"
1736 " new node doesn't have one")
1738 # checks reachablity
1739 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1740 raise errors.OpPrereqError("Node not reachable by ping")
1742 if not newbie_singlehomed:
1743 # check reachability from my secondary ip to newbie's secondary ip
1744 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1745 source=myself.secondary_ip):
1746 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1747 " based ping to noded port")
1749 self.new_node = objects.Node(name=node,
1750 primary_ip=primary_ip,
1751 secondary_ip=secondary_ip)
1753 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1754 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1755 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1756 constants.VNC_PASSWORD_FILE)
1758 def Exec(self, feedback_fn):
1759 """Adds the new node to the cluster.
1762 new_node = self.new_node
1763 node = new_node.name
1765 # set up inter-node password and certificate and restarts the node daemon
1766 gntpass = self.sstore.GetNodeDaemonPassword()
1767 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1768 raise errors.OpExecError("ganeti password corruption detected")
1769 f = open(constants.SSL_CERT_FILE)
1771 gntpem = f.read(8192)
1774 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1775 # so we use this to detect an invalid certificate; as long as the
1776 # cert doesn't contain this, the here-document will be correctly
1777 # parsed by the shell sequence below
1778 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1779 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1780 if not gntpem.endswith("\n"):
1781 raise errors.OpExecError("PEM must end with newline")
1782 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1784 # and then connect with ssh to set password and start ganeti-noded
1785 # note that all the below variables are sanitized at this point,
1786 # either by being constants or by the checks above
1788 mycommand = ("umask 077 && "
1789 "echo '%s' > '%s' && "
1790 "cat > '%s' << '!EOF.' && \n"
1791 "%s!EOF.\n%s restart" %
1792 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1793 constants.SSL_CERT_FILE, gntpem,
1794 constants.NODE_INITD_SCRIPT))
1796 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1798 raise errors.OpExecError("Remote command on node %s, error: %s,"
1800 (node, result.fail_reason, result.output))
1802 # check connectivity
1805 result = rpc.call_version([node])[node]
1807 if constants.PROTOCOL_VERSION == result:
1808 logger.Info("communication to node %s fine, sw version %s match" %
1811 raise errors.OpExecError("Version mismatch master version %s,"
1812 " node version %s" %
1813 (constants.PROTOCOL_VERSION, result))
1815 raise errors.OpExecError("Cannot get version from the new node")
1818 logger.Info("copy ssh key to node %s" % node)
1819 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1821 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1822 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1828 keyarray.append(f.read())
1832 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1833 keyarray[3], keyarray[4], keyarray[5])
1836 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1838 # Add node to our /etc/hosts, and add key to known_hosts
1839 _AddHostToEtcHosts(new_node.name)
1841 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1842 self.cfg.GetHostKey())
1844 if new_node.secondary_ip != new_node.primary_ip:
1845 if not rpc.call_node_tcp_ping(new_node.name,
1846 constants.LOCALHOST_IP_ADDRESS,
1847 new_node.secondary_ip,
1848 constants.DEFAULT_NODED_PORT,
1850 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1851 " you gave (%s). Please fix and re-run this"
1852 " command." % new_node.secondary_ip)
1854 success, msg = ssh.VerifyNodeHostname(node)
1856 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1857 " than the one the resolver gives: %s."
1858 " Please fix and re-run this command." %
1861 # Distribute updated /etc/hosts and known_hosts to all nodes,
1862 # including the node just added
1863 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1864 dist_nodes = self.cfg.GetNodeList()
1865 if not self.op.readd:
1866 dist_nodes.append(node)
1867 if myself.name in dist_nodes:
1868 dist_nodes.remove(myself.name)
1870 logger.Debug("Copying hosts and known_hosts to all nodes")
1871 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1872 result = rpc.call_upload_file(dist_nodes, fname)
1873 for to_node in dist_nodes:
1874 if not result[to_node]:
1875 logger.Error("copy of file %s to node %s failed" %
1878 to_copy = ss.GetFileList()
1879 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1880 to_copy.append(constants.VNC_PASSWORD_FILE)
1881 for fname in to_copy:
1882 if not ssh.CopyFileToNode(node, fname):
1883 logger.Error("could not copy file %s to node %s" % (fname, node))
1885 if not self.op.readd:
1886 logger.Info("adding node %s to cluster.conf" % node)
1887 self.cfg.AddNode(new_node)
1890 class LUMasterFailover(LogicalUnit):
1891 """Failover the master node to the current node.
1893 This is a special LU in that it must run on a non-master node.
1896 HPATH = "master-failover"
1897 HTYPE = constants.HTYPE_CLUSTER
1901 def BuildHooksEnv(self):
1904 This will run on the new master only in the pre phase, and on all
1905 the nodes in the post phase.
1909 "OP_TARGET": self.new_master,
1910 "NEW_MASTER": self.new_master,
1911 "OLD_MASTER": self.old_master,
1913 return env, [self.new_master], self.cfg.GetNodeList()
1915 def CheckPrereq(self):
1916 """Check prerequisites.
1918 This checks that we are not already the master.
1921 self.new_master = utils.HostInfo().name
1922 self.old_master = self.sstore.GetMasterNode()
1924 if self.old_master == self.new_master:
1925 raise errors.OpPrereqError("This commands must be run on the node"
1926 " where you want the new master to be."
1927 " %s is already the master" %
1930 def Exec(self, feedback_fn):
1931 """Failover the master node.
1933 This command, when run on a non-master node, will cause the current
1934 master to cease being master, and the non-master to become new
1938 #TODO: do not rely on gethostname returning the FQDN
1939 logger.Info("setting master to %s, old master: %s" %
1940 (self.new_master, self.old_master))
1942 if not rpc.call_node_stop_master(self.old_master):
1943 logger.Error("could disable the master role on the old master"
1944 " %s, please disable manually" % self.old_master)
1947 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1948 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1949 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1950 logger.Error("could not distribute the new simple store master file"
1951 " to the other nodes, please check.")
1953 if not rpc.call_node_start_master(self.new_master):
1954 logger.Error("could not start the master role on the new master"
1955 " %s, please check" % self.new_master)
1956 feedback_fn("Error in activating the master IP on the new master,"
1957 " please fix manually.")
1961 class LUQueryClusterInfo(NoHooksLU):
1962 """Query cluster configuration.
1968 def CheckPrereq(self):
1969 """No prerequsites needed for this LU.
1974 def Exec(self, feedback_fn):
1975 """Return cluster config.
1979 "name": self.sstore.GetClusterName(),
1980 "software_version": constants.RELEASE_VERSION,
1981 "protocol_version": constants.PROTOCOL_VERSION,
1982 "config_version": constants.CONFIG_VERSION,
1983 "os_api_version": constants.OS_API_VERSION,
1984 "export_version": constants.EXPORT_VERSION,
1985 "master": self.sstore.GetMasterNode(),
1986 "architecture": (platform.architecture()[0], platform.machine()),
1987 "hypervisor_type": self.sstore.GetHypervisorType(),
1993 class LUClusterCopyFile(NoHooksLU):
1994 """Copy file to cluster.
1997 _OP_REQP = ["nodes", "filename"]
1999 def CheckPrereq(self):
2000 """Check prerequisites.
2002 It should check that the named file exists and that the given list
2006 if not os.path.exists(self.op.filename):
2007 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2009 self.nodes = _GetWantedNodes(self, self.op.nodes)
2011 def Exec(self, feedback_fn):
2012 """Copy a file from master to some nodes.
2015 opts - class with options as members
2016 args - list containing a single element, the file name
2018 nodes - list containing the name of target nodes; if empty, all nodes
2021 filename = self.op.filename
2023 myname = utils.HostInfo().name
2025 for node in self.nodes:
2028 if not ssh.CopyFileToNode(node, filename):
2029 logger.Error("Copy of file %s to node %s failed" % (filename, node))
2032 class LUDumpClusterConfig(NoHooksLU):
2033 """Return a text-representation of the cluster-config.
2038 def CheckPrereq(self):
2039 """No prerequisites.
2044 def Exec(self, feedback_fn):
2045 """Dump a representation of the cluster config to the standard output.
2048 return self.cfg.DumpConfig()
2051 class LURunClusterCommand(NoHooksLU):
2052 """Run a command on some nodes.
2055 _OP_REQP = ["command", "nodes"]
2057 def CheckPrereq(self):
2058 """Check prerequisites.
2060 It checks that the given list of nodes is valid.
2063 self.nodes = _GetWantedNodes(self, self.op.nodes)
2065 def Exec(self, feedback_fn):
2066 """Run a command on some nodes.
2069 # put the master at the end of the nodes list
2070 master_node = self.sstore.GetMasterNode()
2071 if master_node in self.nodes:
2072 self.nodes.remove(master_node)
2073 self.nodes.append(master_node)
2076 for node in self.nodes:
2077 result = ssh.SSHCall(node, "root", self.op.command)
2078 data.append((node, result.output, result.exit_code))
2083 class LUActivateInstanceDisks(NoHooksLU):
2084 """Bring up an instance's disks.
2087 _OP_REQP = ["instance_name"]
2089 def CheckPrereq(self):
2090 """Check prerequisites.
2092 This checks that the instance is in the cluster.
2095 instance = self.cfg.GetInstanceInfo(
2096 self.cfg.ExpandInstanceName(self.op.instance_name))
2097 if instance is None:
2098 raise errors.OpPrereqError("Instance '%s' not known" %
2099 self.op.instance_name)
2100 self.instance = instance
2103 def Exec(self, feedback_fn):
2104 """Activate the disks.
2107 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2109 raise errors.OpExecError("Cannot activate block devices")
2114 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2115 """Prepare the block devices for an instance.
2117 This sets up the block devices on all nodes.
2120 instance: a ganeti.objects.Instance object
2121 ignore_secondaries: if true, errors on secondary nodes won't result
2122 in an error return from the function
2125 false if the operation failed
2126 list of (host, instance_visible_name, node_visible_name) if the operation
2127 suceeded with the mapping from node devices to instance devices
2131 iname = instance.name
2132 # With the two passes mechanism we try to reduce the window of
2133 # opportunity for the race condition of switching DRBD to primary
2134 # before handshaking occured, but we do not eliminate it
2136 # The proper fix would be to wait (with some limits) until the
2137 # connection has been made and drbd transitions from WFConnection
2138 # into any other network-connected state (Connected, SyncTarget,
2141 # 1st pass, assemble on all nodes in secondary mode
2142 for inst_disk in instance.disks:
2143 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2144 cfg.SetDiskID(node_disk, node)
2145 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2147 logger.Error("could not prepare block device %s on node %s"
2148 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2149 if not ignore_secondaries:
2152 # FIXME: race condition on drbd migration to primary
2154 # 2nd pass, do only the primary node
2155 for inst_disk in instance.disks:
2156 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2157 if node != instance.primary_node:
2159 cfg.SetDiskID(node_disk, node)
2160 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2162 logger.Error("could not prepare block device %s on node %s"
2163 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2165 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2167 # leave the disks configured for the primary node
2168 # this is a workaround that would be fixed better by
2169 # improving the logical/physical id handling
2170 for disk in instance.disks:
2171 cfg.SetDiskID(disk, instance.primary_node)
2173 return disks_ok, device_info
2176 def _StartInstanceDisks(cfg, instance, force):
2177 """Start the disks of an instance.
2180 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2181 ignore_secondaries=force)
2183 _ShutdownInstanceDisks(instance, cfg)
2184 if force is not None and not force:
2185 logger.Error("If the message above refers to a secondary node,"
2186 " you can retry the operation using '--force'.")
2187 raise errors.OpExecError("Disk consistency error")
2190 class LUDeactivateInstanceDisks(NoHooksLU):
2191 """Shutdown an instance's disks.
2194 _OP_REQP = ["instance_name"]
2196 def CheckPrereq(self):
2197 """Check prerequisites.
2199 This checks that the instance is in the cluster.
2202 instance = self.cfg.GetInstanceInfo(
2203 self.cfg.ExpandInstanceName(self.op.instance_name))
2204 if instance is None:
2205 raise errors.OpPrereqError("Instance '%s' not known" %
2206 self.op.instance_name)
2207 self.instance = instance
2209 def Exec(self, feedback_fn):
2210 """Deactivate the disks
2213 instance = self.instance
2214 ins_l = rpc.call_instance_list([instance.primary_node])
2215 ins_l = ins_l[instance.primary_node]
2216 if not type(ins_l) is list:
2217 raise errors.OpExecError("Can't contact node '%s'" %
2218 instance.primary_node)
2220 if self.instance.name in ins_l:
2221 raise errors.OpExecError("Instance is running, can't shutdown"
2224 _ShutdownInstanceDisks(instance, self.cfg)
2227 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2228 """Shutdown block devices of an instance.
2230 This does the shutdown on all nodes of the instance.
2232 If the ignore_primary is false, errors on the primary node are
2237 for disk in instance.disks:
2238 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2239 cfg.SetDiskID(top_disk, node)
2240 if not rpc.call_blockdev_shutdown(node, top_disk):
2241 logger.Error("could not shutdown block device %s on node %s" %
2242 (disk.iv_name, node))
2243 if not ignore_primary or node != instance.primary_node:
2248 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2249 """Checks if a node has enough free memory.
2251 This function check if a given node has the needed amount of free
2252 memory. In case the node has less memory or we cannot get the
2253 information from the node, this function raise an OpPrereqError
2257 - cfg: a ConfigWriter instance
2258 - node: the node name
2259 - reason: string to use in the error message
2260 - requested: the amount of memory in MiB
2263 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2264 if not nodeinfo or not isinstance(nodeinfo, dict):
2265 raise errors.OpPrereqError("Could not contact node %s for resource"
2266 " information" % (node,))
2268 free_mem = nodeinfo[node].get('memory_free')
2269 if not isinstance(free_mem, int):
2270 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2271 " was '%s'" % (node, free_mem))
2272 if requested > free_mem:
2273 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2274 " needed %s MiB, available %s MiB" %
2275 (node, reason, requested, free_mem))
2278 class LUStartupInstance(LogicalUnit):
2279 """Starts an instance.
2282 HPATH = "instance-start"
2283 HTYPE = constants.HTYPE_INSTANCE
2284 _OP_REQP = ["instance_name", "force"]
2286 def BuildHooksEnv(self):
2289 This runs on master, primary and secondary nodes of the instance.
2293 "FORCE": self.op.force,
2295 env.update(_BuildInstanceHookEnvByObject(self.instance))
2296 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2297 list(self.instance.secondary_nodes))
2300 def CheckPrereq(self):
2301 """Check prerequisites.
2303 This checks that the instance is in the cluster.
2306 instance = self.cfg.GetInstanceInfo(
2307 self.cfg.ExpandInstanceName(self.op.instance_name))
2308 if instance is None:
2309 raise errors.OpPrereqError("Instance '%s' not known" %
2310 self.op.instance_name)
2312 # check bridges existance
2313 _CheckInstanceBridgesExist(instance)
2315 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2316 "starting instance %s" % instance.name,
2319 self.instance = instance
2320 self.op.instance_name = instance.name
2322 def Exec(self, feedback_fn):
2323 """Start the instance.
2326 instance = self.instance
2327 force = self.op.force
2328 extra_args = getattr(self.op, "extra_args", "")
2330 self.cfg.MarkInstanceUp(instance.name)
2332 node_current = instance.primary_node
2334 _StartInstanceDisks(self.cfg, instance, force)
2336 if not rpc.call_instance_start(node_current, instance, extra_args):
2337 _ShutdownInstanceDisks(instance, self.cfg)
2338 raise errors.OpExecError("Could not start instance")
2341 class LURebootInstance(LogicalUnit):
2342 """Reboot an instance.
2345 HPATH = "instance-reboot"
2346 HTYPE = constants.HTYPE_INSTANCE
2347 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2349 def BuildHooksEnv(self):
2352 This runs on master, primary and secondary nodes of the instance.
2356 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2358 env.update(_BuildInstanceHookEnvByObject(self.instance))
2359 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2360 list(self.instance.secondary_nodes))
2363 def CheckPrereq(self):
2364 """Check prerequisites.
2366 This checks that the instance is in the cluster.
2369 instance = self.cfg.GetInstanceInfo(
2370 self.cfg.ExpandInstanceName(self.op.instance_name))
2371 if instance is None:
2372 raise errors.OpPrereqError("Instance '%s' not known" %
2373 self.op.instance_name)
2375 # check bridges existance
2376 _CheckInstanceBridgesExist(instance)
2378 self.instance = instance
2379 self.op.instance_name = instance.name
2381 def Exec(self, feedback_fn):
2382 """Reboot the instance.
2385 instance = self.instance
2386 ignore_secondaries = self.op.ignore_secondaries
2387 reboot_type = self.op.reboot_type
2388 extra_args = getattr(self.op, "extra_args", "")
2390 node_current = instance.primary_node
2392 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2393 constants.INSTANCE_REBOOT_HARD,
2394 constants.INSTANCE_REBOOT_FULL]:
2395 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2396 (constants.INSTANCE_REBOOT_SOFT,
2397 constants.INSTANCE_REBOOT_HARD,
2398 constants.INSTANCE_REBOOT_FULL))
2400 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2401 constants.INSTANCE_REBOOT_HARD]:
2402 if not rpc.call_instance_reboot(node_current, instance,
2403 reboot_type, extra_args):
2404 raise errors.OpExecError("Could not reboot instance")
2406 if not rpc.call_instance_shutdown(node_current, instance):
2407 raise errors.OpExecError("could not shutdown instance for full reboot")
2408 _ShutdownInstanceDisks(instance, self.cfg)
2409 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2410 if not rpc.call_instance_start(node_current, instance, extra_args):
2411 _ShutdownInstanceDisks(instance, self.cfg)
2412 raise errors.OpExecError("Could not start instance for full reboot")
2414 self.cfg.MarkInstanceUp(instance.name)
2417 class LUShutdownInstance(LogicalUnit):
2418 """Shutdown an instance.
2421 HPATH = "instance-stop"
2422 HTYPE = constants.HTYPE_INSTANCE
2423 _OP_REQP = ["instance_name"]
2425 def BuildHooksEnv(self):
2428 This runs on master, primary and secondary nodes of the instance.
2431 env = _BuildInstanceHookEnvByObject(self.instance)
2432 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2433 list(self.instance.secondary_nodes))
2436 def CheckPrereq(self):
2437 """Check prerequisites.
2439 This checks that the instance is in the cluster.
2442 instance = self.cfg.GetInstanceInfo(
2443 self.cfg.ExpandInstanceName(self.op.instance_name))
2444 if instance is None:
2445 raise errors.OpPrereqError("Instance '%s' not known" %
2446 self.op.instance_name)
2447 self.instance = instance
2449 def Exec(self, feedback_fn):
2450 """Shutdown the instance.
2453 instance = self.instance
2454 node_current = instance.primary_node
2455 self.cfg.MarkInstanceDown(instance.name)
2456 if not rpc.call_instance_shutdown(node_current, instance):
2457 logger.Error("could not shutdown instance")
2459 _ShutdownInstanceDisks(instance, self.cfg)
2462 class LUReinstallInstance(LogicalUnit):
2463 """Reinstall an instance.
2466 HPATH = "instance-reinstall"
2467 HTYPE = constants.HTYPE_INSTANCE
2468 _OP_REQP = ["instance_name"]
2470 def BuildHooksEnv(self):
2473 This runs on master, primary and secondary nodes of the instance.
2476 env = _BuildInstanceHookEnvByObject(self.instance)
2477 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2478 list(self.instance.secondary_nodes))
2481 def CheckPrereq(self):
2482 """Check prerequisites.
2484 This checks that the instance is in the cluster and is not running.
2487 instance = self.cfg.GetInstanceInfo(
2488 self.cfg.ExpandInstanceName(self.op.instance_name))
2489 if instance is None:
2490 raise errors.OpPrereqError("Instance '%s' not known" %
2491 self.op.instance_name)
2492 if instance.disk_template == constants.DT_DISKLESS:
2493 raise errors.OpPrereqError("Instance '%s' has no disks" %
2494 self.op.instance_name)
2495 if instance.status != "down":
2496 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2497 self.op.instance_name)
2498 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2500 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2501 (self.op.instance_name,
2502 instance.primary_node))
2504 self.op.os_type = getattr(self.op, "os_type", None)
2505 if self.op.os_type is not None:
2507 pnode = self.cfg.GetNodeInfo(
2508 self.cfg.ExpandNodeName(instance.primary_node))
2510 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2512 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2514 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2515 " primary node" % self.op.os_type)
2517 self.instance = instance
2519 def Exec(self, feedback_fn):
2520 """Reinstall the instance.
2523 inst = self.instance
2525 if self.op.os_type is not None:
2526 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2527 inst.os = self.op.os_type
2528 self.cfg.AddInstance(inst)
2530 _StartInstanceDisks(self.cfg, inst, None)
2532 feedback_fn("Running the instance OS create scripts...")
2533 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2534 raise errors.OpExecError("Could not install OS for instance %s"
2536 (inst.name, inst.primary_node))
2538 _ShutdownInstanceDisks(inst, self.cfg)
2541 class LURenameInstance(LogicalUnit):
2542 """Rename an instance.
2545 HPATH = "instance-rename"
2546 HTYPE = constants.HTYPE_INSTANCE
2547 _OP_REQP = ["instance_name", "new_name"]
2549 def BuildHooksEnv(self):
2552 This runs on master, primary and secondary nodes of the instance.
2555 env = _BuildInstanceHookEnvByObject(self.instance)
2556 env["INSTANCE_NEW_NAME"] = self.op.new_name
2557 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2558 list(self.instance.secondary_nodes))
2561 def CheckPrereq(self):
2562 """Check prerequisites.
2564 This checks that the instance is in the cluster and is not running.
2567 instance = self.cfg.GetInstanceInfo(
2568 self.cfg.ExpandInstanceName(self.op.instance_name))
2569 if instance is None:
2570 raise errors.OpPrereqError("Instance '%s' not known" %
2571 self.op.instance_name)
2572 if instance.status != "down":
2573 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2574 self.op.instance_name)
2575 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2577 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2578 (self.op.instance_name,
2579 instance.primary_node))
2580 self.instance = instance
2582 # new name verification
2583 name_info = utils.HostInfo(self.op.new_name)
2585 self.op.new_name = new_name = name_info.name
2586 instance_list = self.cfg.GetInstanceList()
2587 if new_name in instance_list:
2588 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2591 if not getattr(self.op, "ignore_ip", False):
2592 command = ["fping", "-q", name_info.ip]
2593 result = utils.RunCmd(command)
2594 if not result.failed:
2595 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2596 (name_info.ip, new_name))
2599 def Exec(self, feedback_fn):
2600 """Reinstall the instance.
2603 inst = self.instance
2604 old_name = inst.name
2606 self.cfg.RenameInstance(inst.name, self.op.new_name)
2608 # re-read the instance from the configuration after rename
2609 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2611 _StartInstanceDisks(self.cfg, inst, None)
2613 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2615 msg = ("Could run OS rename script for instance %s on node %s (but the"
2616 " instance has been renamed in Ganeti)" %
2617 (inst.name, inst.primary_node))
2620 _ShutdownInstanceDisks(inst, self.cfg)
2623 class LURemoveInstance(LogicalUnit):
2624 """Remove an instance.
2627 HPATH = "instance-remove"
2628 HTYPE = constants.HTYPE_INSTANCE
2629 _OP_REQP = ["instance_name", "ignore_failures"]
2631 def BuildHooksEnv(self):
2634 This runs on master, primary and secondary nodes of the instance.
2637 env = _BuildInstanceHookEnvByObject(self.instance)
2638 nl = [self.sstore.GetMasterNode()]
2641 def CheckPrereq(self):
2642 """Check prerequisites.
2644 This checks that the instance is in the cluster.
2647 instance = self.cfg.GetInstanceInfo(
2648 self.cfg.ExpandInstanceName(self.op.instance_name))
2649 if instance is None:
2650 raise errors.OpPrereqError("Instance '%s' not known" %
2651 self.op.instance_name)
2652 self.instance = instance
2654 def Exec(self, feedback_fn):
2655 """Remove the instance.
2658 instance = self.instance
2659 logger.Info("shutting down instance %s on node %s" %
2660 (instance.name, instance.primary_node))
2662 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2663 if self.op.ignore_failures:
2664 feedback_fn("Warning: can't shutdown instance")
2666 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2667 (instance.name, instance.primary_node))
2669 logger.Info("removing block devices for instance %s" % instance.name)
2671 if not _RemoveDisks(instance, self.cfg):
2672 if self.op.ignore_failures:
2673 feedback_fn("Warning: can't remove instance's disks")
2675 raise errors.OpExecError("Can't remove instance's disks")
2677 logger.Info("removing instance %s out of cluster config" % instance.name)
2679 self.cfg.RemoveInstance(instance.name)
2682 class LUQueryInstances(NoHooksLU):
2683 """Logical unit for querying instances.
2686 _OP_REQP = ["output_fields", "names"]
2688 def CheckPrereq(self):
2689 """Check prerequisites.
2691 This checks that the fields required are valid output fields.
2694 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2695 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2696 "admin_state", "admin_ram",
2697 "disk_template", "ip", "mac", "bridge",
2698 "sda_size", "sdb_size", "vcpus", "tags"],
2699 dynamic=self.dynamic_fields,
2700 selected=self.op.output_fields)
2702 self.wanted = _GetWantedInstances(self, self.op.names)
2704 def Exec(self, feedback_fn):
2705 """Computes the list of nodes and their attributes.
2708 instance_names = self.wanted
2709 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2712 # begin data gathering
2714 nodes = frozenset([inst.primary_node for inst in instance_list])
2717 if self.dynamic_fields.intersection(self.op.output_fields):
2719 node_data = rpc.call_all_instances_info(nodes)
2721 result = node_data[name]
2723 live_data.update(result)
2724 elif result == False:
2725 bad_nodes.append(name)
2726 # else no instance is alive
2728 live_data = dict([(name, {}) for name in instance_names])
2730 # end data gathering
2733 for instance in instance_list:
2735 for field in self.op.output_fields:
2740 elif field == "pnode":
2741 val = instance.primary_node
2742 elif field == "snodes":
2743 val = list(instance.secondary_nodes)
2744 elif field == "admin_state":
2745 val = (instance.status != "down")
2746 elif field == "oper_state":
2747 if instance.primary_node in bad_nodes:
2750 val = bool(live_data.get(instance.name))
2751 elif field == "status":
2752 if instance.primary_node in bad_nodes:
2753 val = "ERROR_nodedown"
2755 running = bool(live_data.get(instance.name))
2757 if instance.status != "down":
2762 if instance.status != "down":
2766 elif field == "admin_ram":
2767 val = instance.memory
2768 elif field == "oper_ram":
2769 if instance.primary_node in bad_nodes:
2771 elif instance.name in live_data:
2772 val = live_data[instance.name].get("memory", "?")
2775 elif field == "disk_template":
2776 val = instance.disk_template
2778 val = instance.nics[0].ip
2779 elif field == "bridge":
2780 val = instance.nics[0].bridge
2781 elif field == "mac":
2782 val = instance.nics[0].mac
2783 elif field == "sda_size" or field == "sdb_size":
2784 disk = instance.FindDisk(field[:3])
2789 elif field == "vcpus":
2790 val = instance.vcpus
2791 elif field == "tags":
2792 val = list(instance.GetTags())
2794 raise errors.ParameterError(field)
2801 class LUFailoverInstance(LogicalUnit):
2802 """Failover an instance.
2805 HPATH = "instance-failover"
2806 HTYPE = constants.HTYPE_INSTANCE
2807 _OP_REQP = ["instance_name", "ignore_consistency"]
2809 def BuildHooksEnv(self):
2812 This runs on master, primary and secondary nodes of the instance.
2816 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2818 env.update(_BuildInstanceHookEnvByObject(self.instance))
2819 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2822 def CheckPrereq(self):
2823 """Check prerequisites.
2825 This checks that the instance is in the cluster.
2828 instance = self.cfg.GetInstanceInfo(
2829 self.cfg.ExpandInstanceName(self.op.instance_name))
2830 if instance is None:
2831 raise errors.OpPrereqError("Instance '%s' not known" %
2832 self.op.instance_name)
2834 if instance.disk_template not in constants.DTS_NET_MIRROR:
2835 raise errors.OpPrereqError("Instance's disk layout is not"
2836 " network mirrored, cannot failover.")
2838 secondary_nodes = instance.secondary_nodes
2839 if not secondary_nodes:
2840 raise errors.ProgrammerError("no secondary node but using "
2841 "DT_REMOTE_RAID1 template")
2843 target_node = secondary_nodes[0]
2844 # check memory requirements on the secondary node
2845 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2846 instance.name, instance.memory)
2848 # check bridge existance
2849 brlist = [nic.bridge for nic in instance.nics]
2850 if not rpc.call_bridges_exist(target_node, brlist):
2851 raise errors.OpPrereqError("One or more target bridges %s does not"
2852 " exist on destination node '%s'" %
2853 (brlist, target_node))
2855 self.instance = instance
2857 def Exec(self, feedback_fn):
2858 """Failover an instance.
2860 The failover is done by shutting it down on its present node and
2861 starting it on the secondary.
2864 instance = self.instance
2866 source_node = instance.primary_node
2867 target_node = instance.secondary_nodes[0]
2869 feedback_fn("* checking disk consistency between source and target")
2870 for dev in instance.disks:
2871 # for remote_raid1, these are md over drbd
2872 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2873 if instance.status == "up" and not self.op.ignore_consistency:
2874 raise errors.OpExecError("Disk %s is degraded on target node,"
2875 " aborting failover." % dev.iv_name)
2877 feedback_fn("* shutting down instance on source node")
2878 logger.Info("Shutting down instance %s on node %s" %
2879 (instance.name, source_node))
2881 if not rpc.call_instance_shutdown(source_node, instance):
2882 if self.op.ignore_consistency:
2883 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2884 " anyway. Please make sure node %s is down" %
2885 (instance.name, source_node, source_node))
2887 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2888 (instance.name, source_node))
2890 feedback_fn("* deactivating the instance's disks on source node")
2891 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2892 raise errors.OpExecError("Can't shut down the instance's disks.")
2894 instance.primary_node = target_node
2895 # distribute new instance config to the other nodes
2896 self.cfg.Update(instance)
2898 # Only start the instance if it's marked as up
2899 if instance.status == "up":
2900 feedback_fn("* activating the instance's disks on target node")
2901 logger.Info("Starting instance %s on node %s" %
2902 (instance.name, target_node))
2904 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2905 ignore_secondaries=True)
2907 _ShutdownInstanceDisks(instance, self.cfg)
2908 raise errors.OpExecError("Can't activate the instance's disks")
2910 feedback_fn("* starting the instance on the target node")
2911 if not rpc.call_instance_start(target_node, instance, None):
2912 _ShutdownInstanceDisks(instance, self.cfg)
2913 raise errors.OpExecError("Could not start instance %s on node %s." %
2914 (instance.name, target_node))
2917 class LUMigrateInstance(LogicalUnit):
2918 """Migrate an instance.
2920 This is migration without shutting down, compared to the failover,
2921 which is done with shutdown.
2924 HPATH = "instance-migrate"
2925 HTYPE = constants.HTYPE_INSTANCE
2926 _OP_REQP = ["instance_name", "live"]
2928 def BuildHooksEnv(self):
2931 This runs on master, primary and secondary nodes of the instance.
2934 env = _BuildInstanceHookEnvByObject(self.instance)
2935 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2938 def CheckPrereq(self):
2939 """Check prerequisites.
2941 This checks that the instance is in the cluster.
2944 instance = self.cfg.GetInstanceInfo(
2945 self.cfg.ExpandInstanceName(self.op.instance_name))
2946 if instance is None:
2947 raise errors.OpPrereqError("Instance '%s' not known" %
2948 self.op.instance_name)
2950 if instance.disk_template != constants.DT_DRBD8:
2951 raise errors.OpPrereqError("Instance's disk layout is not"
2952 " drbd8, cannot migrate.")
2954 secondary_nodes = instance.secondary_nodes
2955 if not secondary_nodes:
2956 raise errors.ProgrammerError("no secondary node but using "
2957 "drbd8 disk template")
2959 target_node = secondary_nodes[0]
2960 # check memory requirements on the secondary node
2961 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2962 instance.name, instance.memory)
2964 # check bridge existance
2965 brlist = [nic.bridge for nic in instance.nics]
2966 if not rpc.call_bridges_exist(target_node, brlist):
2967 raise errors.OpPrereqError("One or more target bridges %s does not"
2968 " exist on destination node '%s'" %
2969 (brlist, target_node))
2971 migratable = rpc.call_instance_migratable(instance.primary_node, instance)
2973 raise errors.OpPrereqError("Can't contact node '%s'" %
2974 instance.primary_node)
2975 if not migratable[0]:
2976 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
2979 self.instance = instance
2981 def Exec(self, feedback_fn):
2982 """Migrate an instance.
2984 The migrate is done by:
2985 - change the disks into dual-master mode
2986 - wait until disks are fully synchronized again
2987 - migrate the instance
2988 - change disks on the new secondary node (the old primary) to secondary
2989 - wait until disks are fully synchronized
2990 - change disks into single-master mode
2993 instance = self.instance
2995 source_node = instance.primary_node
2996 target_node = instance.secondary_nodes[0]
2997 all_nodes = [source_node, target_node]
2999 feedback_fn("* checking disk consistency between source and target")
3000 for dev in instance.disks:
3001 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3002 raise errors.OpExecError("Disk %s is degraded or not fully"
3003 " synchronized on target node,"
3004 " aborting migrate." % dev.iv_name)
3006 feedback_fn("* ensuring the target is in secondary mode")
3007 result = rpc.call_blockdev_close(target_node, instance.name,
3009 if not result or not result[0]:
3010 raise errors.BlockDeviceError("Cannot switch target node to"
3011 " secondary: %s" % result[1])
3013 feedback_fn("* changing disks into dual-master mode")
3014 logger.Info("Changing disks for instance %s into dual-master mode" %
3018 source_node: self.cfg.GetNodeInfo(source_node).secondary_ip,
3019 target_node: self.cfg.GetNodeInfo(target_node).secondary_ip,
3021 result = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3022 instance.disks, nodes_ip, True)
3023 for node in all_nodes:
3024 if not result[node] or not result[node][0]:
3025 raise errors.OpExecError("Cannot change disks config on node %s,"
3026 " error %s" % (node, result[node][1]))
3028 _WaitForSync(self.cfg, instance, self.proc)
3030 feedback_fn("* migrating instance to %s" % target_node)
3031 result = rpc.call_instance_migrate(source_node, instance,
3032 nodes_ip[target_node], self.op.live)
3033 if not result or not result[0]:
3034 logger.Error("Instance migration failed, trying to revert disk status")
3035 res2 = rpc.call_blockdev_close(target_node, instance.name,
3037 if not res2 or not res2[0]:
3038 feedback_fn("Disk close on secondary failed, instance disks left"
3039 " in dual-master mode")
3041 res2 = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3042 instance.disks, nodes_ip, False)
3043 if not (res2 and res2[source_node][0] and res2[target_node][0]):
3044 logger.Error("Warning: DRBD change to single-master failed: shutdown"
3045 " manually the instance disks or via deactivate-disks,"
3046 " and then restart them via activate-disks")
3047 raise errors.OpExecError("Could not migrate instance %s: %s" %
3048 (instance.name, result[1]))
3050 instance.primary_node = target_node
3051 # distribute new instance config to the other nodes
3052 self.cfg.Update(instance)
3054 feedback_fn("* changing the instance's disks on source node to secondary")
3055 result = rpc.call_blockdev_close(source_node, instance.name,
3057 if not result or not result[0]:
3058 logger.Error("Warning: DRBD deactivation failed: shutdown manually the"
3059 " instance disks or via deactivate-disks, and then"
3060 " restart them via activate-disks")
3062 _WaitForSync(self.cfg, instance, self.proc)
3064 feedback_fn("* changing the instance's disks to single-master")
3065 result = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3066 instance.disks, nodes_ip, False)
3067 if not (result and result[source_node][0] and result[target_node][0]):
3068 logger.Error("Warning: DRBD change to single-master failed: shutdown"
3069 " manually the instance disks or via deactivate-disks,"
3070 " and then restart them via activate-disks")
3073 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3074 """Create a tree of block devices on the primary node.
3076 This always creates all devices.
3080 for child in device.children:
3081 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3084 cfg.SetDiskID(device, node)
3085 new_id = rpc.call_blockdev_create(node, device, device.size,
3086 instance.name, True, info)
3089 if device.physical_id is None:
3090 device.physical_id = new_id
3094 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3095 """Create a tree of block devices on a secondary node.
3097 If this device type has to be created on secondaries, create it and
3100 If not, just recurse to children keeping the same 'force' value.
3103 if device.CreateOnSecondary():
3106 for child in device.children:
3107 if not _CreateBlockDevOnSecondary(cfg, node, instance,
3108 child, force, info):
3113 cfg.SetDiskID(device, node)
3114 new_id = rpc.call_blockdev_create(node, device, device.size,
3115 instance.name, False, info)
3118 if device.physical_id is None:
3119 device.physical_id = new_id
3123 def _GenerateUniqueNames(cfg, exts):
3124 """Generate a suitable LV name.
3126 This will generate a logical volume name for the given instance.
3131 new_id = cfg.GenerateUniqueID()
3132 results.append("%s%s" % (new_id, val))
3136 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3137 """Generate a drbd device complete with its children.
3140 port = cfg.AllocatePort()
3141 vgname = cfg.GetVGName()
3142 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3143 logical_id=(vgname, names[0]))
3144 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3145 logical_id=(vgname, names[1]))
3146 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3147 logical_id = (primary, secondary, port),
3148 children = [dev_data, dev_meta])
3152 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3153 """Generate a drbd8 device complete with its children.
3156 port = cfg.AllocatePort()
3157 vgname = cfg.GetVGName()
3158 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3159 logical_id=(vgname, names[0]))
3160 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3161 logical_id=(vgname, names[1]))
3162 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3163 logical_id = (primary, secondary, port),
3164 children = [dev_data, dev_meta],
3168 def _GenerateDiskTemplate(cfg, template_name,
3169 instance_name, primary_node,
3170 secondary_nodes, disk_sz, swap_sz):
3171 """Generate the entire disk layout for a given template type.
3174 #TODO: compute space requirements
3176 vgname = cfg.GetVGName()
3177 if template_name == constants.DT_DISKLESS:
3179 elif template_name == constants.DT_PLAIN:
3180 if len(secondary_nodes) != 0:
3181 raise errors.ProgrammerError("Wrong template configuration")
3183 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3184 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3185 logical_id=(vgname, names[0]),
3187 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3188 logical_id=(vgname, names[1]),
3190 disks = [sda_dev, sdb_dev]
3191 elif template_name == constants.DT_LOCAL_RAID1:
3192 if len(secondary_nodes) != 0:
3193 raise errors.ProgrammerError("Wrong template configuration")
3196 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3197 ".sdb_m1", ".sdb_m2"])
3198 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3199 logical_id=(vgname, names[0]))
3200 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3201 logical_id=(vgname, names[1]))
3202 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3204 children = [sda_dev_m1, sda_dev_m2])
3205 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3206 logical_id=(vgname, names[2]))
3207 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3208 logical_id=(vgname, names[3]))
3209 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3211 children = [sdb_dev_m1, sdb_dev_m2])
3212 disks = [md_sda_dev, md_sdb_dev]
3213 elif template_name == constants.DT_REMOTE_RAID1:
3214 if len(secondary_nodes) != 1:
3215 raise errors.ProgrammerError("Wrong template configuration")
3216 remote_node = secondary_nodes[0]
3217 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3218 ".sdb_data", ".sdb_meta"])
3219 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3220 disk_sz, names[0:2])
3221 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3222 children = [drbd_sda_dev], size=disk_sz)
3223 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3224 swap_sz, names[2:4])
3225 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3226 children = [drbd_sdb_dev], size=swap_sz)
3227 disks = [md_sda_dev, md_sdb_dev]
3228 elif template_name == constants.DT_DRBD8:
3229 if len(secondary_nodes) != 1:
3230 raise errors.ProgrammerError("Wrong template configuration")
3231 remote_node = secondary_nodes[0]
3232 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3233 ".sdb_data", ".sdb_meta"])
3234 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3235 disk_sz, names[0:2], "sda")
3236 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3237 swap_sz, names[2:4], "sdb")
3238 disks = [drbd_sda_dev, drbd_sdb_dev]
3240 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3244 def _GetInstanceInfoText(instance):
3245 """Compute that text that should be added to the disk's metadata.
3248 return "originstname+%s" % instance.name
3251 def _CreateDisks(cfg, instance):
3252 """Create all disks for an instance.
3254 This abstracts away some work from AddInstance.
3257 instance: the instance object
3260 True or False showing the success of the creation process
3263 info = _GetInstanceInfoText(instance)
3265 for device in instance.disks:
3266 logger.Info("creating volume %s for instance %s" %
3267 (device.iv_name, instance.name))
3269 for secondary_node in instance.secondary_nodes:
3270 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3271 device, False, info):
3272 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3273 (device.iv_name, device, secondary_node))
3276 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3277 instance, device, info):
3278 logger.Error("failed to create volume %s on primary!" %
3284 def _RemoveDisks(instance, cfg):
3285 """Remove all disks for an instance.
3287 This abstracts away some work from `AddInstance()` and
3288 `RemoveInstance()`. Note that in case some of the devices couldn't
3289 be removed, the removal will continue with the other ones (compare
3290 with `_CreateDisks()`).
3293 instance: the instance object
3296 True or False showing the success of the removal proces
3299 logger.Info("removing block devices for instance %s" % instance.name)
3302 for device in instance.disks:
3303 for node, disk in device.ComputeNodeTree(instance.primary_node):
3304 cfg.SetDiskID(disk, node)
3305 if not rpc.call_blockdev_remove(node, disk):
3306 logger.Error("could not remove block device %s on node %s,"
3307 " continuing anyway" %
3308 (device.iv_name, node))
3313 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3314 """Compute disk size requirements in the volume group
3316 This is currently hard-coded for the two-drive layout.
3319 # Required free disk space as a function of disk and swap space
3321 constants.DT_DISKLESS: None,
3322 constants.DT_PLAIN: disk_size + swap_size,
3323 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3324 # 256 MB are added for drbd metadata, 128MB for each drbd device
3325 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3326 constants.DT_DRBD8: disk_size + swap_size + 256,
3329 if disk_template not in req_size_dict:
3330 raise errors.ProgrammerError("Disk template '%s' size requirement"
3331 " is unknown" % disk_template)
3333 return req_size_dict[disk_template]
3336 class LUCreateInstance(LogicalUnit):
3337 """Create an instance.
3340 HPATH = "instance-add"
3341 HTYPE = constants.HTYPE_INSTANCE
3342 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3343 "disk_template", "swap_size", "mode", "start", "vcpus",
3344 "wait_for_sync", "ip_check", "mac"]
3346 def _RunAllocator(self):
3347 """Run the allocator based on input opcode.
3350 disks = [{"size": self.op.disk_size, "mode": "w"},
3351 {"size": self.op.swap_size, "mode": "w"}]
3352 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3353 "bridge": self.op.bridge}]
3354 ial = IAllocator(self.cfg, self.sstore,
3355 mode=constants.IALLOCATOR_MODE_ALLOC,
3356 name=self.op.instance_name,
3357 disk_template=self.op.disk_template,
3360 vcpus=self.op.vcpus,
3361 mem_size=self.op.mem_size,
3366 ial.Run(self.op.iallocator)
3369 raise errors.OpPrereqError("Can't compute nodes using"
3370 " iallocator '%s': %s" % (self.op.iallocator,
3372 if len(ial.nodes) != ial.required_nodes:
3373 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3374 " of nodes (%s), required %s" %
3375 (len(ial.nodes), ial.required_nodes))
3376 self.op.pnode = ial.nodes[0]
3377 logger.ToStdout("Selected nodes for the instance: %s" %
3378 (", ".join(ial.nodes),))
3379 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3380 (self.op.instance_name, self.op.iallocator, ial.nodes))
3381 if ial.required_nodes == 2:
3382 self.op.snode = ial.nodes[1]
3384 def BuildHooksEnv(self):
3387 This runs on master, primary and secondary nodes of the instance.
3391 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3392 "INSTANCE_DISK_SIZE": self.op.disk_size,
3393 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3394 "INSTANCE_ADD_MODE": self.op.mode,
3396 if self.op.mode == constants.INSTANCE_IMPORT:
3397 env["INSTANCE_SRC_NODE"] = self.op.src_node
3398 env["INSTANCE_SRC_PATH"] = self.op.src_path
3399 env["INSTANCE_SRC_IMAGE"] = self.src_image
3401 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3402 primary_node=self.op.pnode,
3403 secondary_nodes=self.secondaries,
3404 status=self.instance_status,
3405 os_type=self.op.os_type,
3406 memory=self.op.mem_size,
3407 vcpus=self.op.vcpus,
3408 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3411 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3416 def CheckPrereq(self):
3417 """Check prerequisites.
3420 # set optional parameters to none if they don't exist
3421 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3422 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3423 "vnc_bind_address"]:
3424 if not hasattr(self.op, attr):
3425 setattr(self.op, attr, None)
3427 if self.op.mode not in (constants.INSTANCE_CREATE,
3428 constants.INSTANCE_IMPORT):
3429 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3432 if self.op.mode == constants.INSTANCE_IMPORT:
3433 src_node = getattr(self.op, "src_node", None)
3434 src_path = getattr(self.op, "src_path", None)
3435 if src_node is None or src_path is None:
3436 raise errors.OpPrereqError("Importing an instance requires source"
3437 " node and path options")
3438 src_node_full = self.cfg.ExpandNodeName(src_node)
3439 if src_node_full is None:
3440 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3441 self.op.src_node = src_node = src_node_full
3443 if not os.path.isabs(src_path):
3444 raise errors.OpPrereqError("The source path must be absolute")
3446 export_info = rpc.call_export_info(src_node, src_path)
3449 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3451 if not export_info.has_section(constants.INISECT_EXP):
3452 raise errors.ProgrammerError("Corrupted export config")
3454 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3455 if (int(ei_version) != constants.EXPORT_VERSION):
3456 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3457 (ei_version, constants.EXPORT_VERSION))
3459 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3460 raise errors.OpPrereqError("Can't import instance with more than"
3463 # FIXME: are the old os-es, disk sizes, etc. useful?
3464 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3465 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3467 self.src_image = diskimage
3468 else: # INSTANCE_CREATE
3469 if getattr(self.op, "os_type", None) is None:
3470 raise errors.OpPrereqError("No guest OS specified")
3472 #### instance parameters check
3474 # disk template and mirror node verification
3475 if self.op.disk_template not in constants.DISK_TEMPLATES:
3476 raise errors.OpPrereqError("Invalid disk template name")
3478 # instance name verification
3479 hostname1 = utils.HostInfo(self.op.instance_name)
3481 self.op.instance_name = instance_name = hostname1.name
3482 instance_list = self.cfg.GetInstanceList()
3483 if instance_name in instance_list:
3484 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3487 # ip validity checks
3488 ip = getattr(self.op, "ip", None)
3489 if ip is None or ip.lower() == "none":
3491 elif ip.lower() == "auto":
3492 inst_ip = hostname1.ip
3494 if not utils.IsValidIP(ip):
3495 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3496 " like a valid IP" % ip)
3498 self.inst_ip = self.op.ip = inst_ip
3500 if self.op.start and not self.op.ip_check:
3501 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3502 " adding an instance in start mode")
3504 if self.op.ip_check:
3505 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3506 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3507 (hostname1.ip, instance_name))
3509 # MAC address verification
3510 if self.op.mac != "auto":
3511 if not utils.IsValidMac(self.op.mac.lower()):
3512 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3515 # bridge verification
3516 bridge = getattr(self.op, "bridge", None)
3518 self.op.bridge = self.cfg.GetDefBridge()
3520 self.op.bridge = bridge
3522 # boot order verification
3523 if self.op.hvm_boot_order is not None:
3524 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3525 raise errors.OpPrereqError("invalid boot order specified,"
3526 " must be one or more of [acdn]")
3529 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3530 raise errors.OpPrereqError("One and only one of iallocator and primary"
3531 " node must be given")
3533 if self.op.iallocator is not None:
3534 self._RunAllocator()
3536 #### node related checks
3538 # check primary node
3539 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3541 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3543 self.op.pnode = pnode.name
3545 self.secondaries = []
3547 # mirror node verification
3548 if self.op.disk_template in constants.DTS_NET_MIRROR:
3549 if getattr(self.op, "snode", None) is None:
3550 raise errors.OpPrereqError("The networked disk templates need"
3553 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3554 if snode_name is None:
3555 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3557 elif snode_name == pnode.name:
3558 raise errors.OpPrereqError("The secondary node cannot be"
3559 " the primary node.")
3560 self.secondaries.append(snode_name)
3562 req_size = _ComputeDiskSize(self.op.disk_template,
3563 self.op.disk_size, self.op.swap_size)
3565 # Check lv size requirements
3566 if req_size is not None:
3567 nodenames = [pnode.name] + self.secondaries
3568 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3569 for node in nodenames:
3570 info = nodeinfo.get(node, None)
3572 raise errors.OpPrereqError("Cannot get current information"
3573 " from node '%s'" % node)
3574 vg_free = info.get('vg_free', None)
3575 if not isinstance(vg_free, int):
3576 raise errors.OpPrereqError("Can't compute free disk space on"
3578 if req_size > info['vg_free']:
3579 raise errors.OpPrereqError("Not enough disk space on target node %s."
3580 " %d MB available, %d MB required" %
3581 (node, info['vg_free'], req_size))
3584 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3586 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3587 " primary node" % self.op.os_type)
3589 if self.op.kernel_path == constants.VALUE_NONE:
3590 raise errors.OpPrereqError("Can't set instance kernel to none")
3593 # bridge check on primary node
3594 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3595 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3596 " destination node '%s'" %
3597 (self.op.bridge, pnode.name))
3599 # memory check on primary node
3601 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3602 "creating instance %s" % self.op.instance_name,
3605 # hvm_cdrom_image_path verification
3606 if self.op.hvm_cdrom_image_path is not None:
3607 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3608 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3609 " be an absolute path or None, not %s" %
3610 self.op.hvm_cdrom_image_path)
3611 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3612 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3613 " regular file or a symlink pointing to"
3614 " an existing regular file, not %s" %
3615 self.op.hvm_cdrom_image_path)
3617 # vnc_bind_address verification
3618 if self.op.vnc_bind_address is not None:
3619 if not utils.IsValidIP(self.op.vnc_bind_address):
3620 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3621 " like a valid IP address" %
3622 self.op.vnc_bind_address)
3625 self.instance_status = 'up'
3627 self.instance_status = 'down'
3629 def Exec(self, feedback_fn):
3630 """Create and add the instance to the cluster.
3633 instance = self.op.instance_name
3634 pnode_name = self.pnode.name
3636 if self.op.mac == "auto":
3637 mac_address = self.cfg.GenerateMAC()
3639 mac_address = self.op.mac
3641 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3642 if self.inst_ip is not None:
3643 nic.ip = self.inst_ip
3645 ht_kind = self.sstore.GetHypervisorType()
3646 if ht_kind in constants.HTS_REQ_PORT:
3647 network_port = self.cfg.AllocatePort()
3651 if self.op.vnc_bind_address is None:
3652 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3654 disks = _GenerateDiskTemplate(self.cfg,
3655 self.op.disk_template,
3656 instance, pnode_name,
3657 self.secondaries, self.op.disk_size,
3660 iobj = objects.Instance(name=instance, os=self.op.os_type,
3661 primary_node=pnode_name,
3662 memory=self.op.mem_size,
3663 vcpus=self.op.vcpus,
3664 nics=[nic], disks=disks,
3665 disk_template=self.op.disk_template,
3666 status=self.instance_status,
3667 network_port=network_port,
3668 kernel_path=self.op.kernel_path,
3669 initrd_path=self.op.initrd_path,
3670 hvm_boot_order=self.op.hvm_boot_order,
3671 hvm_acpi=self.op.hvm_acpi,
3672 hvm_pae=self.op.hvm_pae,
3673 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3674 vnc_bind_address=self.op.vnc_bind_address,
3677 feedback_fn("* creating instance disks...")
3678 if not _CreateDisks(self.cfg, iobj):
3679 _RemoveDisks(iobj, self.cfg)
3680 raise errors.OpExecError("Device creation failed, reverting...")
3682 feedback_fn("adding instance %s to cluster config" % instance)
3684 self.cfg.AddInstance(iobj)
3686 if self.op.wait_for_sync:
3687 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3688 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3689 # make sure the disks are not degraded (still sync-ing is ok)
3691 feedback_fn("* checking mirrors status")
3692 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3697 _RemoveDisks(iobj, self.cfg)
3698 self.cfg.RemoveInstance(iobj.name)
3699 raise errors.OpExecError("There are some degraded disks for"
3702 feedback_fn("creating os for instance %s on node %s" %
3703 (instance, pnode_name))
3705 if iobj.disk_template != constants.DT_DISKLESS:
3706 if self.op.mode == constants.INSTANCE_CREATE:
3707 feedback_fn("* running the instance OS create scripts...")
3708 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3709 raise errors.OpExecError("could not add os for instance %s"
3711 (instance, pnode_name))
3713 elif self.op.mode == constants.INSTANCE_IMPORT:
3714 feedback_fn("* running the instance OS import scripts...")
3715 src_node = self.op.src_node
3716 src_image = self.src_image
3717 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3718 src_node, src_image):
3719 raise errors.OpExecError("Could not import os for instance"
3721 (instance, pnode_name))
3723 # also checked in the prereq part
3724 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3728 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3729 feedback_fn("* starting instance...")
3730 if not rpc.call_instance_start(pnode_name, iobj, None):
3731 raise errors.OpExecError("Could not start instance")
3734 class LUConnectConsole(NoHooksLU):
3735 """Connect to an instance's console.
3737 This is somewhat special in that it returns the command line that
3738 you need to run on the master node in order to connect to the
3742 _OP_REQP = ["instance_name"]
3744 def CheckPrereq(self):
3745 """Check prerequisites.
3747 This checks that the instance is in the cluster.
3750 instance = self.cfg.GetInstanceInfo(
3751 self.cfg.ExpandInstanceName(self.op.instance_name))
3752 if instance is None:
3753 raise errors.OpPrereqError("Instance '%s' not known" %
3754 self.op.instance_name)
3755 self.instance = instance
3757 def Exec(self, feedback_fn):
3758 """Connect to the console of an instance
3761 instance = self.instance
3762 node = instance.primary_node
3764 node_insts = rpc.call_instance_list([node])[node]
3765 if node_insts is False:
3766 raise errors.OpExecError("Can't connect to node %s." % node)
3768 if instance.name not in node_insts:
3769 raise errors.OpExecError("Instance %s is not running." % instance.name)
3771 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3773 hyper = hypervisor.GetHypervisor()
3774 console_cmd = hyper.GetShellCommandForConsole(instance)
3776 argv = ["ssh", "-q", "-t"]
3777 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3778 argv.extend(ssh.BATCH_MODE_OPTS)
3780 argv.append(console_cmd)
3784 class LUAddMDDRBDComponent(LogicalUnit):
3785 """Adda new mirror member to an instance's disk.
3788 HPATH = "mirror-add"
3789 HTYPE = constants.HTYPE_INSTANCE
3790 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3792 def BuildHooksEnv(self):
3795 This runs on the master, the primary and all the secondaries.
3799 "NEW_SECONDARY": self.op.remote_node,
3800 "DISK_NAME": self.op.disk_name,
3802 env.update(_BuildInstanceHookEnvByObject(self.instance))
3803 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3804 self.op.remote_node,] + list(self.instance.secondary_nodes)
3807 def CheckPrereq(self):
3808 """Check prerequisites.
3810 This checks that the instance is in the cluster.
3813 instance = self.cfg.GetInstanceInfo(
3814 self.cfg.ExpandInstanceName(self.op.instance_name))
3815 if instance is None:
3816 raise errors.OpPrereqError("Instance '%s' not known" %
3817 self.op.instance_name)
3818 self.instance = instance
3820 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3821 if remote_node is None:
3822 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3823 self.remote_node = remote_node
3825 if remote_node == instance.primary_node:
3826 raise errors.OpPrereqError("The specified node is the primary node of"
3829 if instance.disk_template != constants.DT_REMOTE_RAID1:
3830 raise errors.OpPrereqError("Instance's disk layout is not"
3832 for disk in instance.disks:
3833 if disk.iv_name == self.op.disk_name:
3836 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3837 " instance." % self.op.disk_name)
3838 if len(disk.children) > 1:
3839 raise errors.OpPrereqError("The device already has two slave devices."
3840 " This would create a 3-disk raid1 which we"
3844 def Exec(self, feedback_fn):
3845 """Add the mirror component
3849 instance = self.instance
3851 remote_node = self.remote_node
3852 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3853 names = _GenerateUniqueNames(self.cfg, lv_names)
3854 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3855 remote_node, disk.size, names)
3857 logger.Info("adding new mirror component on secondary")
3859 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3861 _GetInstanceInfoText(instance)):
3862 raise errors.OpExecError("Failed to create new component on secondary"
3863 " node %s" % remote_node)
3865 logger.Info("adding new mirror component on primary")
3867 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3869 _GetInstanceInfoText(instance)):
3870 # remove secondary dev
3871 self.cfg.SetDiskID(new_drbd, remote_node)
3872 rpc.call_blockdev_remove(remote_node, new_drbd)
3873 raise errors.OpExecError("Failed to create volume on primary")
3875 # the device exists now
3876 # call the primary node to add the mirror to md
3877 logger.Info("adding new mirror component to md")
3878 if not rpc.call_blockdev_addchildren(instance.primary_node,
3880 logger.Error("Can't add mirror compoment to md!")
3881 self.cfg.SetDiskID(new_drbd, remote_node)
3882 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3883 logger.Error("Can't rollback on secondary")
3884 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3885 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3886 logger.Error("Can't rollback on primary")
3887 raise errors.OpExecError("Can't add mirror component to md array")
3889 disk.children.append(new_drbd)
3891 self.cfg.AddInstance(instance)
3893 _WaitForSync(self.cfg, instance, self.proc)
3898 class LURemoveMDDRBDComponent(LogicalUnit):
3899 """Remove a component from a remote_raid1 disk.
3902 HPATH = "mirror-remove"
3903 HTYPE = constants.HTYPE_INSTANCE
3904 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3906 def BuildHooksEnv(self):
3909 This runs on the master, the primary and all the secondaries.
3913 "DISK_NAME": self.op.disk_name,
3914 "DISK_ID": self.op.disk_id,
3915 "OLD_SECONDARY": self.old_secondary,
3917 env.update(_BuildInstanceHookEnvByObject(self.instance))
3918 nl = [self.sstore.GetMasterNode(),
3919 self.instance.primary_node] + list(self.instance.secondary_nodes)
3922 def CheckPrereq(self):
3923 """Check prerequisites.
3925 This checks that the instance is in the cluster.
3928 instance = self.cfg.GetInstanceInfo(
3929 self.cfg.ExpandInstanceName(self.op.instance_name))
3930 if instance is None:
3931 raise errors.OpPrereqError("Instance '%s' not known" %
3932 self.op.instance_name)
3933 self.instance = instance
3935 if instance.disk_template != constants.DT_REMOTE_RAID1:
3936 raise errors.OpPrereqError("Instance's disk layout is not"
3938 for disk in instance.disks:
3939 if disk.iv_name == self.op.disk_name:
3942 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3943 " instance." % self.op.disk_name)
3944 for child in disk.children:
3945 if (child.dev_type == constants.LD_DRBD7 and
3946 child.logical_id[2] == self.op.disk_id):
3949 raise errors.OpPrereqError("Can't find the device with this port.")
3951 if len(disk.children) < 2:
3952 raise errors.OpPrereqError("Cannot remove the last component from"
3956 if self.child.logical_id[0] == instance.primary_node:
3960 self.old_secondary = self.child.logical_id[oid]
3962 def Exec(self, feedback_fn):
3963 """Remove the mirror component
3966 instance = self.instance
3969 logger.Info("remove mirror component")
3970 self.cfg.SetDiskID(disk, instance.primary_node)
3971 if not rpc.call_blockdev_removechildren(instance.primary_node,
3973 raise errors.OpExecError("Can't remove child from mirror.")
3975 for node in child.logical_id[:2]:
3976 self.cfg.SetDiskID(child, node)
3977 if not rpc.call_blockdev_remove(node, child):
3978 logger.Error("Warning: failed to remove device from node %s,"
3979 " continuing operation." % node)
3981 disk.children.remove(child)
3982 self.cfg.AddInstance(instance)
3985 class LUReplaceDisks(LogicalUnit):
3986 """Replace the disks of an instance.
3989 HPATH = "mirrors-replace"
3990 HTYPE = constants.HTYPE_INSTANCE
3991 _OP_REQP = ["instance_name", "mode", "disks"]
3993 def _RunAllocator(self):
3994 """Compute a new secondary node using an IAllocator.
3997 ial = IAllocator(self.cfg, self.sstore,
3998 mode=constants.IALLOCATOR_MODE_RELOC,
3999 name=self.op.instance_name,
4000 relocate_from=[self.sec_node])
4002 ial.Run(self.op.iallocator)
4005 raise errors.OpPrereqError("Can't compute nodes using"
4006 " iallocator '%s': %s" % (self.op.iallocator,
4008 if len(ial.nodes) != ial.required_nodes:
4009 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4010 " of nodes (%s), required %s" %
4011 (len(ial.nodes), ial.required_nodes))
4012 self.op.remote_node = ial.nodes[0]
4013 logger.ToStdout("Selected new secondary for the instance: %s" %
4014 self.op.remote_node)
4016 def BuildHooksEnv(self):
4019 This runs on the master, the primary and all the secondaries.
4023 "MODE": self.op.mode,
4024 "NEW_SECONDARY": self.op.remote_node,
4025 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4027 env.update(_BuildInstanceHookEnvByObject(self.instance))
4029 self.sstore.GetMasterNode(),
4030 self.instance.primary_node,
4032 if self.op.remote_node is not None:
4033 nl.append(self.op.remote_node)
4036 def CheckPrereq(self):
4037 """Check prerequisites.
4039 This checks that the instance is in the cluster.
4042 if not hasattr(self.op, "remote_node"):
4043 self.op.remote_node = None
4045 instance = self.cfg.GetInstanceInfo(
4046 self.cfg.ExpandInstanceName(self.op.instance_name))
4047 if instance is None:
4048 raise errors.OpPrereqError("Instance '%s' not known" %
4049 self.op.instance_name)
4050 self.instance = instance
4051 self.op.instance_name = instance.name
4053 if instance.disk_template not in constants.DTS_NET_MIRROR:
4054 raise errors.OpPrereqError("Instance's disk layout is not"
4055 " network mirrored.")
4057 if len(instance.secondary_nodes) != 1:
4058 raise errors.OpPrereqError("The instance has a strange layout,"
4059 " expected one secondary but found %d" %
4060 len(instance.secondary_nodes))
4062 self.sec_node = instance.secondary_nodes[0]
4064 ia_name = getattr(self.op, "iallocator", None)
4065 if ia_name is not None:
4066 if self.op.remote_node is not None:
4067 raise errors.OpPrereqError("Give either the iallocator or the new"
4068 " secondary, not both")
4069 self.op.remote_node = self._RunAllocator()
4071 remote_node = self.op.remote_node
4072 if remote_node is not None:
4073 remote_node = self.cfg.ExpandNodeName(remote_node)
4074 if remote_node is None:
4075 raise errors.OpPrereqError("Node '%s' not known" %
4076 self.op.remote_node)
4077 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4079 self.remote_node_info = None
4080 if remote_node == instance.primary_node:
4081 raise errors.OpPrereqError("The specified node is the primary node of"
4083 elif remote_node == self.sec_node:
4084 if self.op.mode == constants.REPLACE_DISK_SEC:
4085 # this is for DRBD8, where we can't execute the same mode of
4086 # replacement as for drbd7 (no different port allocated)
4087 raise errors.OpPrereqError("Same secondary given, cannot execute"
4089 # the user gave the current secondary, switch to
4090 # 'no-replace-secondary' mode for drbd7
4092 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4093 self.op.mode != constants.REPLACE_DISK_ALL):
4094 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4095 " disks replacement, not individual ones")
4096 if instance.disk_template == constants.DT_DRBD8:
4097 if (self.op.mode == constants.REPLACE_DISK_ALL and
4098 remote_node is not None):
4099 # switch to replace secondary mode
4100 self.op.mode = constants.REPLACE_DISK_SEC
4102 if self.op.mode == constants.REPLACE_DISK_ALL:
4103 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4104 " secondary disk replacement, not"
4106 elif self.op.mode == constants.REPLACE_DISK_PRI:
4107 if remote_node is not None:
4108 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4109 " the secondary while doing a primary"
4110 " node disk replacement")
4111 self.tgt_node = instance.primary_node
4112 self.oth_node = instance.secondary_nodes[0]
4113 elif self.op.mode == constants.REPLACE_DISK_SEC:
4114 self.new_node = remote_node # this can be None, in which case
4115 # we don't change the secondary
4116 self.tgt_node = instance.secondary_nodes[0]
4117 self.oth_node = instance.primary_node
4119 raise errors.ProgrammerError("Unhandled disk replace mode")
4121 for name in self.op.disks:
4122 if instance.FindDisk(name) is None:
4123 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4124 (name, instance.name))
4125 self.op.remote_node = remote_node
4127 def _ExecRR1(self, feedback_fn):
4128 """Replace the disks of an instance.
4131 instance = self.instance
4134 if self.op.remote_node is None:
4135 remote_node = self.sec_node
4137 remote_node = self.op.remote_node
4139 for dev in instance.disks:
4141 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4142 names = _GenerateUniqueNames(cfg, lv_names)
4143 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4144 remote_node, size, names)
4145 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4146 logger.Info("adding new mirror component on secondary for %s" %
4149 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4151 _GetInstanceInfoText(instance)):
4152 raise errors.OpExecError("Failed to create new component on secondary"
4153 " node %s. Full abort, cleanup manually!" %
4156 logger.Info("adding new mirror component on primary")
4158 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4160 _GetInstanceInfoText(instance)):
4161 # remove secondary dev
4162 cfg.SetDiskID(new_drbd, remote_node)
4163 rpc.call_blockdev_remove(remote_node, new_drbd)
4164 raise errors.OpExecError("Failed to create volume on primary!"
4165 " Full abort, cleanup manually!!")
4167 # the device exists now
4168 # call the primary node to add the mirror to md
4169 logger.Info("adding new mirror component to md")
4170 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4172 logger.Error("Can't add mirror compoment to md!")
4173 cfg.SetDiskID(new_drbd, remote_node)
4174 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4175 logger.Error("Can't rollback on secondary")
4176 cfg.SetDiskID(new_drbd, instance.primary_node)
4177 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4178 logger.Error("Can't rollback on primary")
4179 raise errors.OpExecError("Full abort, cleanup manually!!")
4181 dev.children.append(new_drbd)
4182 cfg.AddInstance(instance)
4184 # this can fail as the old devices are degraded and _WaitForSync
4185 # does a combined result over all disks, so we don't check its
4187 _WaitForSync(cfg, instance, self.proc, unlock=True)
4189 # so check manually all the devices
4190 for name in iv_names:
4191 dev, child, new_drbd = iv_names[name]
4192 cfg.SetDiskID(dev, instance.primary_node)
4193 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4195 raise errors.OpExecError("MD device %s is degraded!" % name)
4196 cfg.SetDiskID(new_drbd, instance.primary_node)
4197 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4199 raise errors.OpExecError("New drbd device %s is degraded!" % name)
4201 for name in iv_names:
4202 dev, child, new_drbd = iv_names[name]
4203 logger.Info("remove mirror %s component" % name)
4204 cfg.SetDiskID(dev, instance.primary_node)
4205 if not rpc.call_blockdev_removechildren(instance.primary_node,
4207 logger.Error("Can't remove child from mirror, aborting"
4208 " *this device cleanup*.\nYou need to cleanup manually!!")
4211 for node in child.logical_id[:2]:
4212 logger.Info("remove child device on %s" % node)
4213 cfg.SetDiskID(child, node)
4214 if not rpc.call_blockdev_remove(node, child):
4215 logger.Error("Warning: failed to remove device from node %s,"
4216 " continuing operation." % node)
4218 dev.children.remove(child)
4220 cfg.AddInstance(instance)
4222 def _ExecD8DiskOnly(self, feedback_fn):
4223 """Replace a disk on the primary or secondary for dbrd8.
4225 The algorithm for replace is quite complicated:
4226 - for each disk to be replaced:
4227 - create new LVs on the target node with unique names
4228 - detach old LVs from the drbd device
4229 - rename old LVs to name_replaced.<time_t>
4230 - rename new LVs to old LVs
4231 - attach the new LVs (with the old names now) to the drbd device
4232 - wait for sync across all devices
4233 - for each modified disk:
4234 - remove old LVs (which have the name name_replaces.<time_t>)
4236 Failures are not very well handled.
4240 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4241 instance = self.instance
4243 vgname = self.cfg.GetVGName()
4246 tgt_node = self.tgt_node
4247 oth_node = self.oth_node
4249 # Step: check device activation
4250 self.proc.LogStep(1, steps_total, "check device existence")
4251 info("checking volume groups")
4252 my_vg = cfg.GetVGName()
4253 results = rpc.call_vg_list([oth_node, tgt_node])
4255 raise errors.OpExecError("Can't list volume groups on the nodes")
4256 for node in oth_node, tgt_node:
4257 res = results.get(node, False)
4258 if not res or my_vg not in res:
4259 raise errors.OpExecError("Volume group '%s' not found on %s" %
4261 for dev in instance.disks:
4262 if not dev.iv_name in self.op.disks:
4264 for node in tgt_node, oth_node:
4265 info("checking %s on %s" % (dev.iv_name, node))
4266 cfg.SetDiskID(dev, node)
4267 if not rpc.call_blockdev_find(node, dev):
4268 raise errors.OpExecError("Can't find device %s on node %s" %
4269 (dev.iv_name, node))
4271 # Step: check other node consistency
4272 self.proc.LogStep(2, steps_total, "check peer consistency")
4273 for dev in instance.disks:
4274 if not dev.iv_name in self.op.disks:
4276 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4277 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4278 oth_node==instance.primary_node):
4279 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4280 " to replace disks on this node (%s)" %
4281 (oth_node, tgt_node))
4283 # Step: create new storage
4284 self.proc.LogStep(3, steps_total, "allocate new storage")
4285 for dev in instance.disks:
4286 if not dev.iv_name in self.op.disks:
4289 cfg.SetDiskID(dev, tgt_node)
4290 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4291 names = _GenerateUniqueNames(cfg, lv_names)
4292 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4293 logical_id=(vgname, names[0]))
4294 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4295 logical_id=(vgname, names[1]))
4296 new_lvs = [lv_data, lv_meta]
4297 old_lvs = dev.children
4298 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4299 info("creating new local storage on %s for %s" %
4300 (tgt_node, dev.iv_name))
4301 # since we *always* want to create this LV, we use the
4302 # _Create...OnPrimary (which forces the creation), even if we
4303 # are talking about the secondary node
4304 for new_lv in new_lvs:
4305 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4306 _GetInstanceInfoText(instance)):
4307 raise errors.OpExecError("Failed to create new LV named '%s' on"
4309 (new_lv.logical_id[1], tgt_node))
4311 # Step: for each lv, detach+rename*2+attach
4312 self.proc.LogStep(4, steps_total, "change drbd configuration")
4313 for dev, old_lvs, new_lvs in iv_names.itervalues():
4314 info("detaching %s drbd from local storage" % dev.iv_name)
4315 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4316 raise errors.OpExecError("Can't detach drbd from local storage on node"
4317 " %s for device %s" % (tgt_node, dev.iv_name))
4319 #cfg.Update(instance)
4321 # ok, we created the new LVs, so now we know we have the needed
4322 # storage; as such, we proceed on the target node to rename
4323 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4324 # using the assumption that logical_id == physical_id (which in
4325 # turn is the unique_id on that node)
4327 # FIXME(iustin): use a better name for the replaced LVs
4328 temp_suffix = int(time.time())
4329 ren_fn = lambda d, suff: (d.physical_id[0],
4330 d.physical_id[1] + "_replaced-%s" % suff)
4331 # build the rename list based on what LVs exist on the node
4333 for to_ren in old_lvs:
4334 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4335 if find_res is not None: # device exists
4336 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4338 info("renaming the old LVs on the target node")
4339 if not rpc.call_blockdev_rename(tgt_node, rlist):
4340 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4341 # now we rename the new LVs to the old LVs
4342 info("renaming the new LVs on the target node")
4343 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4344 if not rpc.call_blockdev_rename(tgt_node, rlist):
4345 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4347 for old, new in zip(old_lvs, new_lvs):
4348 new.logical_id = old.logical_id
4349 cfg.SetDiskID(new, tgt_node)
4351 for disk in old_lvs:
4352 disk.logical_id = ren_fn(disk, temp_suffix)
4353 cfg.SetDiskID(disk, tgt_node)
4355 # now that the new lvs have the old name, we can add them to the device
4356 info("adding new mirror component on %s" % tgt_node)
4357 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4358 for new_lv in new_lvs:
4359 if not rpc.call_blockdev_remove(tgt_node, new_lv):
4360 warning("Can't rollback device %s", hint="manually cleanup unused"
4362 raise errors.OpExecError("Can't add local storage to drbd")
4364 dev.children = new_lvs
4365 cfg.Update(instance)
4367 # Step: wait for sync
4369 # this can fail as the old devices are degraded and _WaitForSync
4370 # does a combined result over all disks, so we don't check its
4372 self.proc.LogStep(5, steps_total, "sync devices")
4373 _WaitForSync(cfg, instance, self.proc, unlock=True)
4375 # so check manually all the devices
4376 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4377 cfg.SetDiskID(dev, instance.primary_node)
4378 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4380 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4382 # Step: remove old storage
4383 self.proc.LogStep(6, steps_total, "removing old storage")
4384 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4385 info("remove logical volumes for %s" % name)
4387 cfg.SetDiskID(lv, tgt_node)
4388 if not rpc.call_blockdev_remove(tgt_node, lv):
4389 warning("Can't remove old LV", hint="manually remove unused LVs")
4392 def _ExecD8Secondary(self, feedback_fn):
4393 """Replace the secondary node for drbd8.
4395 The algorithm for replace is quite complicated:
4396 - for all disks of the instance:
4397 - create new LVs on the new node with same names
4398 - shutdown the drbd device on the old secondary
4399 - disconnect the drbd network on the primary
4400 - create the drbd device on the new secondary
4401 - network attach the drbd on the primary, using an artifice:
4402 the drbd code for Attach() will connect to the network if it
4403 finds a device which is connected to the good local disks but
4405 - wait for sync across all devices
4406 - remove all disks from the old secondary
4408 Failures are not very well handled.
4412 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4413 instance = self.instance
4415 vgname = self.cfg.GetVGName()
4418 old_node = self.tgt_node
4419 new_node = self.new_node
4420 pri_node = instance.primary_node
4422 # Step: check device activation
4423 self.proc.LogStep(1, steps_total, "check device existence")
4424 info("checking volume groups")
4425 my_vg = cfg.GetVGName()
4426 results = rpc.call_vg_list([pri_node, new_node])
4428 raise errors.OpExecError("Can't list volume groups on the nodes")
4429 for node in pri_node, new_node:
4430 res = results.get(node, False)
4431 if not res or my_vg not in res:
4432 raise errors.OpExecError("Volume group '%s' not found on %s" %
4434 for dev in instance.disks:
4435 if not dev.iv_name in self.op.disks:
4437 info("checking %s on %s" % (dev.iv_name, pri_node))
4438 cfg.SetDiskID(dev, pri_node)
4439 if not rpc.call_blockdev_find(pri_node, dev):
4440 raise errors.OpExecError("Can't find device %s on node %s" %
4441 (dev.iv_name, pri_node))
4443 # Step: check other node consistency
4444 self.proc.LogStep(2, steps_total, "check peer consistency")
4445 for dev in instance.disks:
4446 if not dev.iv_name in self.op.disks:
4448 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4449 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4450 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4451 " unsafe to replace the secondary" %
4454 # Step: create new storage
4455 self.proc.LogStep(3, steps_total, "allocate new storage")
4456 for dev in instance.disks:
4458 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4459 # since we *always* want to create this LV, we use the
4460 # _Create...OnPrimary (which forces the creation), even if we
4461 # are talking about the secondary node
4462 for new_lv in dev.children:
4463 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4464 _GetInstanceInfoText(instance)):
4465 raise errors.OpExecError("Failed to create new LV named '%s' on"
4467 (new_lv.logical_id[1], new_node))
4469 iv_names[dev.iv_name] = (dev, dev.children)
4471 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4472 for dev in instance.disks:
4474 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4475 # create new devices on new_node
4476 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4477 logical_id=(pri_node, new_node,
4479 children=dev.children)
4480 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4482 _GetInstanceInfoText(instance)):
4483 raise errors.OpExecError("Failed to create new DRBD on"
4484 " node '%s'" % new_node)
4486 for dev in instance.disks:
4487 # we have new devices, shutdown the drbd on the old secondary
4488 info("shutting down drbd for %s on old node" % dev.iv_name)
4489 cfg.SetDiskID(dev, old_node)
4490 if not rpc.call_blockdev_shutdown(old_node, dev):
4491 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4492 hint="Please cleanup this device manually as soon as possible")
4494 info("detaching primary drbds from the network (=> standalone)")
4496 for dev in instance.disks:
4497 cfg.SetDiskID(dev, pri_node)
4498 # set the physical (unique in bdev terms) id to None, meaning
4499 # detach from network
4500 dev.physical_id = (None,) * len(dev.physical_id)
4501 # and 'find' the device, which will 'fix' it to match the
4503 if rpc.call_blockdev_find(pri_node, dev):
4506 warning("Failed to detach drbd %s from network, unusual case" %
4510 # no detaches succeeded (very unlikely)
4511 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4513 # if we managed to detach at least one, we update all the disks of
4514 # the instance to point to the new secondary
4515 info("updating instance configuration")
4516 for dev in instance.disks:
4517 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4518 cfg.SetDiskID(dev, pri_node)
4519 cfg.Update(instance)
4521 # and now perform the drbd attach
4522 info("attaching primary drbds to new secondary (standalone => connected)")
4524 for dev in instance.disks:
4525 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4526 # since the attach is smart, it's enough to 'find' the device,
4527 # it will automatically activate the network, if the physical_id
4529 cfg.SetDiskID(dev, pri_node)
4530 if not rpc.call_blockdev_find(pri_node, dev):
4531 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4532 "please do a gnt-instance info to see the status of disks")
4534 # this can fail as the old devices are degraded and _WaitForSync
4535 # does a combined result over all disks, so we don't check its
4537 self.proc.LogStep(5, steps_total, "sync devices")
4538 _WaitForSync(cfg, instance, self.proc, unlock=True)
4540 # so check manually all the devices
4541 for name, (dev, old_lvs) in iv_names.iteritems():
4542 cfg.SetDiskID(dev, pri_node)
4543 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4545 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4547 self.proc.LogStep(6, steps_total, "removing old storage")
4548 for name, (dev, old_lvs) in iv_names.iteritems():
4549 info("remove logical volumes for %s" % name)
4551 cfg.SetDiskID(lv, old_node)
4552 if not rpc.call_blockdev_remove(old_node, lv):
4553 warning("Can't remove LV on old secondary",
4554 hint="Cleanup stale volumes by hand")
4556 def Exec(self, feedback_fn):
4557 """Execute disk replacement.
4559 This dispatches the disk replacement to the appropriate handler.
4562 instance = self.instance
4564 # Activate the instance disks if we're replacing them on a down instance
4565 if instance.status == "down":
4566 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4567 self.proc.ChainOpCode(op)
4569 if instance.disk_template == constants.DT_REMOTE_RAID1:
4571 elif instance.disk_template == constants.DT_DRBD8:
4572 if self.op.remote_node is None:
4573 fn = self._ExecD8DiskOnly
4575 fn = self._ExecD8Secondary
4577 raise errors.ProgrammerError("Unhandled disk replacement case")
4579 ret = fn(feedback_fn)
4581 # Deactivate the instance disks if we're replacing them on a down instance
4582 if instance.status == "down":
4583 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4584 self.proc.ChainOpCode(op)
4589 class LUGrowDisk(LogicalUnit):
4590 """Grow a disk of an instance.
4594 HTYPE = constants.HTYPE_INSTANCE
4595 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4597 def BuildHooksEnv(self):
4600 This runs on the master, the primary and all the secondaries.
4604 "DISK": self.op.disk,
4605 "AMOUNT": self.op.amount,
4607 env.update(_BuildInstanceHookEnvByObject(self.instance))
4609 self.sstore.GetMasterNode(),
4610 self.instance.primary_node,
4614 def CheckPrereq(self):
4615 """Check prerequisites.
4617 This checks that the instance is in the cluster.
4620 instance = self.cfg.GetInstanceInfo(
4621 self.cfg.ExpandInstanceName(self.op.instance_name))
4622 if instance is None:
4623 raise errors.OpPrereqError("Instance '%s' not known" %
4624 self.op.instance_name)
4626 if self.op.amount <= 0:
4627 raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4629 self.instance = instance
4630 self.op.instance_name = instance.name
4632 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4633 raise errors.OpPrereqError("Instance's disk layout does not support"
4636 self.disk = instance.FindDisk(self.op.disk)
4637 if self.disk is None:
4638 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4639 (self.op.disk, instance.name))
4641 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4642 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4643 for node in nodenames:
4644 info = nodeinfo.get(node, None)
4646 raise errors.OpPrereqError("Cannot get current information"
4647 " from node '%s'" % node)
4648 vg_free = info.get('vg_free', None)
4649 if not isinstance(vg_free, int):
4650 raise errors.OpPrereqError("Can't compute free disk space on"
4652 if self.op.amount > info['vg_free']:
4653 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4654 " %d MiB available, %d MiB required" %
4655 (node, info['vg_free'], self.op.amount))
4656 is_primary = (node == instance.primary_node)
4657 if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4658 raise errors.OpPrereqError("Disk %s is degraded or not fully"
4659 " synchronized on node %s,"
4660 " aborting grow." % (self.op.disk, node))
4662 def Exec(self, feedback_fn):
4663 """Execute disk grow.
4666 instance = self.instance
4668 for node in (instance.secondary_nodes + (instance.primary_node,)):
4669 self.cfg.SetDiskID(disk, node)
4670 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4671 if not result or not isinstance(result, tuple) or len(result) != 2:
4672 raise errors.OpExecError("grow request failed to node %s" % node)
4674 raise errors.OpExecError("grow request failed to node %s: %s" %
4676 disk.RecordGrow(self.op.amount)
4677 self.cfg.Update(instance)
4678 if self.op.wait_for_sync:
4679 disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4681 logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4682 " Please check the instance.")
4685 class LUQueryInstanceData(NoHooksLU):
4686 """Query runtime instance data.
4689 _OP_REQP = ["instances"]
4691 def CheckPrereq(self):
4692 """Check prerequisites.
4694 This only checks the optional instance list against the existing names.
4697 if not isinstance(self.op.instances, list):
4698 raise errors.OpPrereqError("Invalid argument type 'instances'")
4699 if self.op.instances:
4700 self.wanted_instances = []
4701 names = self.op.instances
4703 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4704 if instance is None:
4705 raise errors.OpPrereqError("No such instance name '%s'" % name)
4706 self.wanted_instances.append(instance)
4708 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4709 in self.cfg.GetInstanceList()]
4713 def _ComputeDiskStatus(self, instance, snode, dev):
4714 """Compute block device status.
4717 self.cfg.SetDiskID(dev, instance.primary_node)
4718 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4719 if dev.dev_type in constants.LDS_DRBD:
4720 # we change the snode then (otherwise we use the one passed in)
4721 if dev.logical_id[0] == instance.primary_node:
4722 snode = dev.logical_id[1]
4724 snode = dev.logical_id[0]
4727 self.cfg.SetDiskID(dev, snode)
4728 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4733 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4734 for child in dev.children]
4739 "iv_name": dev.iv_name,
4740 "dev_type": dev.dev_type,
4741 "logical_id": dev.logical_id,
4742 "physical_id": dev.physical_id,
4743 "pstatus": dev_pstatus,
4744 "sstatus": dev_sstatus,
4745 "children": dev_children,
4750 def Exec(self, feedback_fn):
4751 """Gather and return data"""
4753 for instance in self.wanted_instances:
4754 remote_info = rpc.call_instance_info(instance.primary_node,
4756 if remote_info and "state" in remote_info:
4759 remote_state = "down"
4760 if instance.status == "down":
4761 config_state = "down"
4765 disks = [self._ComputeDiskStatus(instance, None, device)
4766 for device in instance.disks]
4769 "name": instance.name,
4770 "config_state": config_state,
4771 "run_state": remote_state,
4772 "pnode": instance.primary_node,
4773 "snodes": instance.secondary_nodes,
4775 "memory": instance.memory,
4776 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4778 "vcpus": instance.vcpus,
4781 htkind = self.sstore.GetHypervisorType()
4782 if htkind == constants.HT_XEN_PVM30:
4783 idict["kernel_path"] = instance.kernel_path
4784 idict["initrd_path"] = instance.initrd_path
4786 if htkind == constants.HT_XEN_HVM31:
4787 idict["hvm_boot_order"] = instance.hvm_boot_order
4788 idict["hvm_acpi"] = instance.hvm_acpi
4789 idict["hvm_pae"] = instance.hvm_pae
4790 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4792 if htkind in constants.HTS_REQ_PORT:
4793 idict["vnc_bind_address"] = instance.vnc_bind_address
4794 idict["network_port"] = instance.network_port
4796 result[instance.name] = idict
4801 class LUSetInstanceParms(LogicalUnit):
4802 """Modifies an instances's parameters.
4805 HPATH = "instance-modify"
4806 HTYPE = constants.HTYPE_INSTANCE
4807 _OP_REQP = ["instance_name"]
4809 def BuildHooksEnv(self):
4812 This runs on the master, primary and secondaries.
4817 args['memory'] = self.mem
4819 args['vcpus'] = self.vcpus
4820 if self.do_ip or self.do_bridge or self.mac:
4824 ip = self.instance.nics[0].ip
4826 bridge = self.bridge
4828 bridge = self.instance.nics[0].bridge
4832 mac = self.instance.nics[0].mac
4833 args['nics'] = [(ip, bridge, mac)]
4834 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4835 nl = [self.sstore.GetMasterNode(),
4836 self.instance.primary_node] + list(self.instance.secondary_nodes)
4839 def CheckPrereq(self):
4840 """Check prerequisites.
4842 This only checks the instance list against the existing names.
4845 self.mem = getattr(self.op, "mem", None)
4846 self.vcpus = getattr(self.op, "vcpus", None)
4847 self.ip = getattr(self.op, "ip", None)
4848 self.mac = getattr(self.op, "mac", None)
4849 self.bridge = getattr(self.op, "bridge", None)
4850 self.kernel_path = getattr(self.op, "kernel_path", None)
4851 self.initrd_path = getattr(self.op, "initrd_path", None)
4852 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4853 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4854 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4855 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4856 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4857 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4858 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4859 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4860 self.vnc_bind_address]
4861 if all_parms.count(None) == len(all_parms):
4862 raise errors.OpPrereqError("No changes submitted")
4863 if self.mem is not None:
4865 self.mem = int(self.mem)
4866 except ValueError, err:
4867 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4868 if self.vcpus is not None:
4870 self.vcpus = int(self.vcpus)
4871 except ValueError, err:
4872 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4873 if self.ip is not None:
4875 if self.ip.lower() == "none":
4878 if not utils.IsValidIP(self.ip):
4879 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4882 self.do_bridge = (self.bridge is not None)
4883 if self.mac is not None:
4884 if self.cfg.IsMacInUse(self.mac):
4885 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4887 if not utils.IsValidMac(self.mac):
4888 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4890 if self.kernel_path is not None:
4891 self.do_kernel_path = True
4892 if self.kernel_path == constants.VALUE_NONE:
4893 raise errors.OpPrereqError("Can't set instance to no kernel")
4895 if self.kernel_path != constants.VALUE_DEFAULT:
4896 if not os.path.isabs(self.kernel_path):
4897 raise errors.OpPrereqError("The kernel path must be an absolute"
4900 self.do_kernel_path = False
4902 if self.initrd_path is not None:
4903 self.do_initrd_path = True
4904 if self.initrd_path not in (constants.VALUE_NONE,
4905 constants.VALUE_DEFAULT):
4906 if not os.path.isabs(self.initrd_path):
4907 raise errors.OpPrereqError("The initrd path must be an absolute"
4910 self.do_initrd_path = False
4912 # boot order verification
4913 if self.hvm_boot_order is not None:
4914 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4915 if len(self.hvm_boot_order.strip("acdn")) != 0:
4916 raise errors.OpPrereqError("invalid boot order specified,"
4917 " must be one or more of [acdn]"
4920 # hvm_cdrom_image_path verification
4921 if self.op.hvm_cdrom_image_path is not None:
4922 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4923 self.op.hvm_cdrom_image_path.lower() == "none"):
4924 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4925 " be an absolute path or None, not %s" %
4926 self.op.hvm_cdrom_image_path)
4927 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4928 self.op.hvm_cdrom_image_path.lower() == "none"):
4929 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4930 " regular file or a symlink pointing to"
4931 " an existing regular file, not %s" %
4932 self.op.hvm_cdrom_image_path)
4934 # vnc_bind_address verification
4935 if self.op.vnc_bind_address is not None:
4936 if not utils.IsValidIP(self.op.vnc_bind_address):
4937 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4938 " like a valid IP address" %
4939 self.op.vnc_bind_address)
4941 instance = self.cfg.GetInstanceInfo(
4942 self.cfg.ExpandInstanceName(self.op.instance_name))
4943 if instance is None:
4944 raise errors.OpPrereqError("No such instance name '%s'" %
4945 self.op.instance_name)
4946 self.op.instance_name = instance.name
4947 self.instance = instance
4950 def Exec(self, feedback_fn):
4951 """Modifies an instance.
4953 All parameters take effect only at the next restart of the instance.
4956 instance = self.instance
4958 instance.memory = self.mem
4959 result.append(("mem", self.mem))
4961 instance.vcpus = self.vcpus
4962 result.append(("vcpus", self.vcpus))
4964 instance.nics[0].ip = self.ip
4965 result.append(("ip", self.ip))
4967 instance.nics[0].bridge = self.bridge
4968 result.append(("bridge", self.bridge))
4970 instance.nics[0].mac = self.mac
4971 result.append(("mac", self.mac))
4972 if self.do_kernel_path:
4973 instance.kernel_path = self.kernel_path
4974 result.append(("kernel_path", self.kernel_path))
4975 if self.do_initrd_path:
4976 instance.initrd_path = self.initrd_path
4977 result.append(("initrd_path", self.initrd_path))
4978 if self.hvm_boot_order:
4979 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4980 instance.hvm_boot_order = None
4982 instance.hvm_boot_order = self.hvm_boot_order
4983 result.append(("hvm_boot_order", self.hvm_boot_order))
4984 if self.hvm_acpi is not None:
4985 instance.hvm_acpi = self.hvm_acpi
4986 result.append(("hvm_acpi", self.hvm_acpi))
4987 if self.hvm_pae is not None:
4988 instance.hvm_pae = self.hvm_pae
4989 result.append(("hvm_pae", self.hvm_pae))
4990 if self.hvm_cdrom_image_path:
4991 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4992 instance.hvm_cdrom_image_path = None
4994 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4995 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4996 if self.vnc_bind_address:
4997 instance.vnc_bind_address = self.vnc_bind_address
4998 result.append(("vnc_bind_address", self.vnc_bind_address))
5000 self.cfg.AddInstance(instance)
5005 class LUQueryExports(NoHooksLU):
5006 """Query the exports list
5011 def CheckPrereq(self):
5012 """Check that the nodelist contains only existing nodes.
5015 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5017 def Exec(self, feedback_fn):
5018 """Compute the list of all the exported system images.
5021 a dictionary with the structure node->(export-list)
5022 where export-list is a list of the instances exported on
5026 return rpc.call_export_list(self.nodes)
5029 class LUExportInstance(LogicalUnit):
5030 """Export an instance to an image in the cluster.
5033 HPATH = "instance-export"
5034 HTYPE = constants.HTYPE_INSTANCE
5035 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5037 def BuildHooksEnv(self):
5040 This will run on the master, primary node and target node.
5044 "EXPORT_NODE": self.op.target_node,
5045 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5047 env.update(_BuildInstanceHookEnvByObject(self.instance))
5048 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5049 self.op.target_node]
5052 def CheckPrereq(self):
5053 """Check prerequisites.
5055 This checks that the instance and node names are valid.
5058 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5059 self.instance = self.cfg.GetInstanceInfo(instance_name)
5060 if self.instance is None:
5061 raise errors.OpPrereqError("Instance '%s' not found" %
5062 self.op.instance_name)
5065 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5066 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5068 if self.dst_node is None:
5069 raise errors.OpPrereqError("Destination node '%s' is unknown." %
5070 self.op.target_node)
5071 self.op.target_node = self.dst_node.name
5073 def Exec(self, feedback_fn):
5074 """Export an instance to an image in the cluster.
5077 instance = self.instance
5078 dst_node = self.dst_node
5079 src_node = instance.primary_node
5080 if self.op.shutdown:
5081 # shutdown the instance, but not the disks
5082 if not rpc.call_instance_shutdown(src_node, instance):
5083 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5084 (instance.name, src_node))
5086 vgname = self.cfg.GetVGName()
5091 for disk in instance.disks:
5092 if disk.iv_name == "sda":
5093 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5094 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5096 if not new_dev_name:
5097 logger.Error("could not snapshot block device %s on node %s" %
5098 (disk.logical_id[1], src_node))
5100 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5101 logical_id=(vgname, new_dev_name),
5102 physical_id=(vgname, new_dev_name),
5103 iv_name=disk.iv_name)
5104 snap_disks.append(new_dev)
5107 if self.op.shutdown and instance.status == "up":
5108 if not rpc.call_instance_start(src_node, instance, None):
5109 _ShutdownInstanceDisks(instance, self.cfg)
5110 raise errors.OpExecError("Could not start instance")
5112 # TODO: check for size
5114 for dev in snap_disks:
5115 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5117 logger.Error("could not export block device %s from node"
5119 (dev.logical_id[1], src_node, dst_node.name))
5120 if not rpc.call_blockdev_remove(src_node, dev):
5121 logger.Error("could not remove snapshot block device %s from"
5122 " node %s" % (dev.logical_id[1], src_node))
5124 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5125 logger.Error("could not finalize export for instance %s on node %s" %
5126 (instance.name, dst_node.name))
5128 nodelist = self.cfg.GetNodeList()
5129 nodelist.remove(dst_node.name)
5131 # on one-node clusters nodelist will be empty after the removal
5132 # if we proceed the backup would be removed because OpQueryExports
5133 # substitutes an empty list with the full cluster node list.
5135 op = opcodes.OpQueryExports(nodes=nodelist)
5136 exportlist = self.proc.ChainOpCode(op)
5137 for node in exportlist:
5138 if instance.name in exportlist[node]:
5139 if not rpc.call_export_remove(node, instance.name):
5140 logger.Error("could not remove older export for instance %s"
5141 " on node %s" % (instance.name, node))
5144 class LURemoveExport(NoHooksLU):
5145 """Remove exports related to the named instance.
5148 _OP_REQP = ["instance_name"]
5150 def CheckPrereq(self):
5151 """Check prerequisites.
5155 def Exec(self, feedback_fn):
5156 """Remove any export.
5159 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5160 # If the instance was not found we'll try with the name that was passed in.
5161 # This will only work if it was an FQDN, though.
5163 if not instance_name:
5165 instance_name = self.op.instance_name
5167 op = opcodes.OpQueryExports(nodes=[])
5168 exportlist = self.proc.ChainOpCode(op)
5170 for node in exportlist:
5171 if instance_name in exportlist[node]:
5173 if not rpc.call_export_remove(node, instance_name):
5174 logger.Error("could not remove export for instance %s"
5175 " on node %s" % (instance_name, node))
5177 if fqdn_warn and not found:
5178 feedback_fn("Export not found. If trying to remove an export belonging"
5179 " to a deleted instance please use its Fully Qualified"
5183 class TagsLU(NoHooksLU):
5186 This is an abstract class which is the parent of all the other tags LUs.
5189 def CheckPrereq(self):
5190 """Check prerequisites.
5193 if self.op.kind == constants.TAG_CLUSTER:
5194 self.target = self.cfg.GetClusterInfo()
5195 elif self.op.kind == constants.TAG_NODE:
5196 name = self.cfg.ExpandNodeName(self.op.name)
5198 raise errors.OpPrereqError("Invalid node name (%s)" %
5201 self.target = self.cfg.GetNodeInfo(name)
5202 elif self.op.kind == constants.TAG_INSTANCE:
5203 name = self.cfg.ExpandInstanceName(self.op.name)
5205 raise errors.OpPrereqError("Invalid instance name (%s)" %
5208 self.target = self.cfg.GetInstanceInfo(name)
5210 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5214 class LUGetTags(TagsLU):
5215 """Returns the tags of a given object.
5218 _OP_REQP = ["kind", "name"]
5220 def Exec(self, feedback_fn):
5221 """Returns the tag list.
5224 return self.target.GetTags()
5227 class LUSearchTags(NoHooksLU):
5228 """Searches the tags for a given pattern.
5231 _OP_REQP = ["pattern"]
5233 def CheckPrereq(self):
5234 """Check prerequisites.
5236 This checks the pattern passed for validity by compiling it.
5240 self.re = re.compile(self.op.pattern)
5241 except re.error, err:
5242 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5243 (self.op.pattern, err))
5245 def Exec(self, feedback_fn):
5246 """Returns the tag list.
5250 tgts = [("/cluster", cfg.GetClusterInfo())]
5251 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5252 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5253 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5254 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5256 for path, target in tgts:
5257 for tag in target.GetTags():
5258 if self.re.search(tag):
5259 results.append((path, tag))
5263 class LUAddTags(TagsLU):
5264 """Sets a tag on a given object.
5267 _OP_REQP = ["kind", "name", "tags"]
5269 def CheckPrereq(self):
5270 """Check prerequisites.
5272 This checks the type and length of the tag name and value.
5275 TagsLU.CheckPrereq(self)
5276 for tag in self.op.tags:
5277 objects.TaggableObject.ValidateTag(tag)
5279 def Exec(self, feedback_fn):
5284 for tag in self.op.tags:
5285 self.target.AddTag(tag)
5286 except errors.TagError, err:
5287 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5289 self.cfg.Update(self.target)
5290 except errors.ConfigurationError:
5291 raise errors.OpRetryError("There has been a modification to the"
5292 " config file and the operation has been"
5293 " aborted. Please retry.")
5296 class LUDelTags(TagsLU):
5297 """Delete a list of tags from a given object.
5300 _OP_REQP = ["kind", "name", "tags"]
5302 def CheckPrereq(self):
5303 """Check prerequisites.
5305 This checks that we have the given tag.
5308 TagsLU.CheckPrereq(self)
5309 for tag in self.op.tags:
5310 objects.TaggableObject.ValidateTag(tag, removal=True)
5311 del_tags = frozenset(self.op.tags)
5312 cur_tags = self.target.GetTags()
5313 if not del_tags <= cur_tags:
5314 diff_tags = del_tags - cur_tags
5315 diff_names = ["'%s'" % tag for tag in diff_tags]
5317 raise errors.OpPrereqError("Tag(s) %s not found" %
5318 (",".join(diff_names)))
5320 def Exec(self, feedback_fn):
5321 """Remove the tag from the object.
5324 for tag in self.op.tags:
5325 self.target.RemoveTag(tag)
5327 self.cfg.Update(self.target)
5328 except errors.ConfigurationError:
5329 raise errors.OpRetryError("There has been a modification to the"
5330 " config file and the operation has been"
5331 " aborted. Please retry.")
5333 class LUTestDelay(NoHooksLU):
5334 """Sleep for a specified amount of time.
5336 This LU sleeps on the master and/or nodes for a specified amoutn of
5340 _OP_REQP = ["duration", "on_master", "on_nodes"]
5342 def CheckPrereq(self):
5343 """Check prerequisites.
5345 This checks that we have a good list of nodes and/or the duration
5350 if self.op.on_nodes:
5351 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5353 def Exec(self, feedback_fn):
5354 """Do the actual sleep.
5357 if self.op.on_master:
5358 if not utils.TestDelay(self.op.duration):
5359 raise errors.OpExecError("Error during master delay test")
5360 if self.op.on_nodes:
5361 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5363 raise errors.OpExecError("Complete failure from rpc call")
5364 for node, node_result in result.items():
5366 raise errors.OpExecError("Failure during rpc call to node %s,"
5367 " result: %s" % (node, node_result))
5370 class IAllocator(object):
5371 """IAllocator framework.
5373 An IAllocator instance has three sets of attributes:
5374 - cfg/sstore that are needed to query the cluster
5375 - input data (all members of the _KEYS class attribute are required)
5376 - four buffer attributes (in|out_data|text), that represent the
5377 input (to the external script) in text and data structure format,
5378 and the output from it, again in two formats
5379 - the result variables from the script (success, info, nodes) for
5384 "mem_size", "disks", "disk_template",
5385 "os", "tags", "nics", "vcpus",
5391 def __init__(self, cfg, sstore, mode, name, **kwargs):
5393 self.sstore = sstore
5394 # init buffer variables
5395 self.in_text = self.out_text = self.in_data = self.out_data = None
5396 # init all input fields so that pylint is happy
5399 self.mem_size = self.disks = self.disk_template = None
5400 self.os = self.tags = self.nics = self.vcpus = None
5401 self.relocate_from = None
5403 self.required_nodes = None
5404 # init result fields
5405 self.success = self.info = self.nodes = None
5406 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5407 keyset = self._ALLO_KEYS
5408 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5409 keyset = self._RELO_KEYS
5411 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5412 " IAllocator" % self.mode)
5414 if key not in keyset:
5415 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5416 " IAllocator" % key)
5417 setattr(self, key, kwargs[key])
5419 if key not in kwargs:
5420 raise errors.ProgrammerError("Missing input parameter '%s' to"
5421 " IAllocator" % key)
5422 self._BuildInputData()
5424 def _ComputeClusterData(self):
5425 """Compute the generic allocator input data.
5427 This is the data that is independent of the actual operation.
5434 "cluster_name": self.sstore.GetClusterName(),
5435 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5436 "hypervisor_type": self.sstore.GetHypervisorType(),
5437 # we don't have job IDs
5440 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5444 node_list = cfg.GetNodeList()
5445 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5446 for nname in node_list:
5447 ninfo = cfg.GetNodeInfo(nname)
5448 if nname not in node_data or not isinstance(node_data[nname], dict):
5449 raise errors.OpExecError("Can't get data for node %s" % nname)
5450 remote_info = node_data[nname]
5451 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5452 'vg_size', 'vg_free', 'cpu_total']:
5453 if attr not in remote_info:
5454 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5457 remote_info[attr] = int(remote_info[attr])
5458 except ValueError, err:
5459 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5460 " %s" % (nname, attr, str(err)))
5461 # compute memory used by primary instances
5462 i_p_mem = i_p_up_mem = 0
5463 for iinfo in i_list:
5464 if iinfo.primary_node == nname:
5465 i_p_mem += iinfo.memory
5466 if iinfo.status == "up":
5467 i_p_up_mem += iinfo.memory
5469 # compute memory used by instances
5471 "tags": list(ninfo.GetTags()),
5472 "total_memory": remote_info['memory_total'],
5473 "reserved_memory": remote_info['memory_dom0'],
5474 "free_memory": remote_info['memory_free'],
5475 "i_pri_memory": i_p_mem,
5476 "i_pri_up_memory": i_p_up_mem,
5477 "total_disk": remote_info['vg_size'],
5478 "free_disk": remote_info['vg_free'],
5479 "primary_ip": ninfo.primary_ip,
5480 "secondary_ip": ninfo.secondary_ip,
5481 "total_cpus": remote_info['cpu_total'],
5483 node_results[nname] = pnr
5484 data["nodes"] = node_results
5488 for iinfo in i_list:
5489 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5490 for n in iinfo.nics]
5492 "tags": list(iinfo.GetTags()),
5493 "should_run": iinfo.status == "up",
5494 "vcpus": iinfo.vcpus,
5495 "memory": iinfo.memory,
5497 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5499 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5500 "disk_template": iinfo.disk_template,
5502 instance_data[iinfo.name] = pir
5504 data["instances"] = instance_data
5508 def _AddNewInstance(self):
5509 """Add new instance data to allocator structure.
5511 This in combination with _AllocatorGetClusterData will create the
5512 correct structure needed as input for the allocator.
5514 The checks for the completeness of the opcode must have already been
5519 if len(self.disks) != 2:
5520 raise errors.OpExecError("Only two-disk configurations supported")
5522 disk_space = _ComputeDiskSize(self.disk_template,
5523 self.disks[0]["size"], self.disks[1]["size"])
5525 if self.disk_template in constants.DTS_NET_MIRROR:
5526 self.required_nodes = 2
5528 self.required_nodes = 1
5532 "disk_template": self.disk_template,
5535 "vcpus": self.vcpus,
5536 "memory": self.mem_size,
5537 "disks": self.disks,
5538 "disk_space_total": disk_space,
5540 "required_nodes": self.required_nodes,
5542 data["request"] = request
5544 def _AddRelocateInstance(self):
5545 """Add relocate instance data to allocator structure.
5547 This in combination with _IAllocatorGetClusterData will create the
5548 correct structure needed as input for the allocator.
5550 The checks for the completeness of the opcode must have already been
5554 instance = self.cfg.GetInstanceInfo(self.name)
5555 if instance is None:
5556 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5557 " IAllocator" % self.name)
5559 if instance.disk_template not in constants.DTS_NET_MIRROR:
5560 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5562 if len(instance.secondary_nodes) != 1:
5563 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5565 self.required_nodes = 1
5567 disk_space = _ComputeDiskSize(instance.disk_template,
5568 instance.disks[0].size,
5569 instance.disks[1].size)
5574 "disk_space_total": disk_space,
5575 "required_nodes": self.required_nodes,
5576 "relocate_from": self.relocate_from,
5578 self.in_data["request"] = request
5580 def _BuildInputData(self):
5581 """Build input data structures.
5584 self._ComputeClusterData()
5586 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5587 self._AddNewInstance()
5589 self._AddRelocateInstance()
5591 self.in_text = serializer.Dump(self.in_data)
5593 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5594 """Run an instance allocator and return the results.
5599 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5601 if not isinstance(result, tuple) or len(result) != 4:
5602 raise errors.OpExecError("Invalid result from master iallocator runner")
5604 rcode, stdout, stderr, fail = result
5606 if rcode == constants.IARUN_NOTFOUND:
5607 raise errors.OpExecError("Can't find allocator '%s'" % name)
5608 elif rcode == constants.IARUN_FAILURE:
5609 raise errors.OpExecError("Instance allocator call failed: %s,"
5611 (fail, stdout+stderr))
5612 self.out_text = stdout
5614 self._ValidateResult()
5616 def _ValidateResult(self):
5617 """Process the allocator results.
5619 This will process and if successful save the result in
5620 self.out_data and the other parameters.
5624 rdict = serializer.Load(self.out_text)
5625 except Exception, err:
5626 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5628 if not isinstance(rdict, dict):
5629 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5631 for key in "success", "info", "nodes":
5632 if key not in rdict:
5633 raise errors.OpExecError("Can't parse iallocator results:"
5634 " missing key '%s'" % key)
5635 setattr(self, key, rdict[key])
5637 if not isinstance(rdict["nodes"], list):
5638 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5640 self.out_data = rdict
5643 class LUTestAllocator(NoHooksLU):
5644 """Run allocator tests.
5646 This LU runs the allocator tests
5649 _OP_REQP = ["direction", "mode", "name"]
5651 def CheckPrereq(self):
5652 """Check prerequisites.
5654 This checks the opcode parameters depending on the director and mode test.
5657 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5658 for attr in ["name", "mem_size", "disks", "disk_template",
5659 "os", "tags", "nics", "vcpus"]:
5660 if not hasattr(self.op, attr):
5661 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5663 iname = self.cfg.ExpandInstanceName(self.op.name)
5664 if iname is not None:
5665 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5667 if not isinstance(self.op.nics, list):
5668 raise errors.OpPrereqError("Invalid parameter 'nics'")
5669 for row in self.op.nics:
5670 if (not isinstance(row, dict) or
5673 "bridge" not in row):
5674 raise errors.OpPrereqError("Invalid contents of the"
5675 " 'nics' parameter")
5676 if not isinstance(self.op.disks, list):
5677 raise errors.OpPrereqError("Invalid parameter 'disks'")
5678 if len(self.op.disks) != 2:
5679 raise errors.OpPrereqError("Only two-disk configurations supported")
5680 for row in self.op.disks:
5681 if (not isinstance(row, dict) or
5682 "size" not in row or
5683 not isinstance(row["size"], int) or
5684 "mode" not in row or
5685 row["mode"] not in ['r', 'w']):
5686 raise errors.OpPrereqError("Invalid contents of the"
5687 " 'disks' parameter")
5688 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5689 if not hasattr(self.op, "name"):
5690 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5691 fname = self.cfg.ExpandInstanceName(self.op.name)
5693 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5695 self.op.name = fname
5696 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5698 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5701 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5702 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5703 raise errors.OpPrereqError("Missing allocator name")
5704 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5705 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5708 def Exec(self, feedback_fn):
5709 """Run the allocator test.
5712 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5713 ial = IAllocator(self.cfg, self.sstore,
5716 mem_size=self.op.mem_size,
5717 disks=self.op.disks,
5718 disk_template=self.op.disk_template,
5722 vcpus=self.op.vcpus,
5725 ial = IAllocator(self.cfg, self.sstore,
5728 relocate_from=list(self.relocate_from),
5731 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5732 result = ial.in_text
5734 ial.Run(self.op.allocator, validate=False)
5735 result = ial.out_text