4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 No nodes should be returned as an empty list (and not None).
138 Note that if the HPATH for a LU class is None, this function will
142 raise NotImplementedError
144 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145 """Notify the LU about the results of its hooks.
147 This method is called every time a hooks phase is executed, and notifies
148 the Logical Unit about the hooks' result. The LU can then use it to alter
149 its result based on the hooks. By default the method does nothing and the
150 previous result is passed back unchanged but any LU can define it if it
151 wants to use the local cluster hook-scripts somehow.
154 phase: the hooks phase that has just been run
155 hooks_results: the results of the multi-node hooks rpc call
156 feedback_fn: function to send feedback back to the caller
157 lu_result: the previous result this LU had, or None in the PRE phase.
163 class NoHooksLU(LogicalUnit):
164 """Simple LU which runs no hooks.
166 This LU is intended as a parent for other LogicalUnits which will
167 run no hooks, in order to reduce duplicate code.
174 def _AddHostToEtcHosts(hostname):
175 """Wrapper around utils.SetEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182 def _RemoveHostFromEtcHosts(hostname):
183 """Wrapper around utils.RemoveEtcHostsEntry.
186 hi = utils.HostInfo(name=hostname)
187 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191 def _GetWantedNodes(lu, nodes):
192 """Returns list of checked and expanded node names.
195 nodes: List of nodes (strings) or None for all
198 if not isinstance(nodes, list):
199 raise errors.OpPrereqError("Invalid argument type 'nodes'")
205 node = lu.cfg.ExpandNodeName(name)
207 raise errors.OpPrereqError("No such node name '%s'" % name)
211 wanted = lu.cfg.GetNodeList()
212 return utils.NiceSort(wanted)
215 def _GetWantedInstances(lu, instances):
216 """Returns list of checked and expanded instance names.
219 instances: List of instances (strings) or None for all
222 if not isinstance(instances, list):
223 raise errors.OpPrereqError("Invalid argument type 'instances'")
228 for name in instances:
229 instance = lu.cfg.ExpandInstanceName(name)
231 raise errors.OpPrereqError("No such instance name '%s'" % name)
232 wanted.append(instance)
235 wanted = lu.cfg.GetInstanceList()
236 return utils.NiceSort(wanted)
239 def _CheckOutputFields(static, dynamic, selected):
240 """Checks whether all selected fields are valid.
243 static: Static fields
244 dynamic: Dynamic fields
247 static_fields = frozenset(static)
248 dynamic_fields = frozenset(dynamic)
250 all_fields = static_fields | dynamic_fields
252 if not all_fields.issuperset(selected):
253 raise errors.OpPrereqError("Unknown output fields selected: %s"
254 % ",".join(frozenset(selected).
255 difference(all_fields)))
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259 memory, vcpus, nics):
260 """Builds instance related env variables for hooks from single variables.
263 secondary_nodes: List of secondary nodes as strings
267 "INSTANCE_NAME": name,
268 "INSTANCE_PRIMARY": primary_node,
269 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270 "INSTANCE_OS_TYPE": os_type,
271 "INSTANCE_STATUS": status,
272 "INSTANCE_MEMORY": memory,
273 "INSTANCE_VCPUS": vcpus,
277 nic_count = len(nics)
278 for idx, (ip, bridge, mac) in enumerate(nics):
281 env["INSTANCE_NIC%d_IP" % idx] = ip
282 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
287 env["INSTANCE_NIC_COUNT"] = nic_count
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293 """Builds instance related env variables for hooks from an object.
296 instance: objects.Instance object of instance
297 override: dict of values to override
300 'name': instance.name,
301 'primary_node': instance.primary_node,
302 'secondary_nodes': instance.secondary_nodes,
303 'os_type': instance.os,
304 'status': instance.os,
305 'memory': instance.memory,
306 'vcpus': instance.vcpus,
307 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310 args.update(override)
311 return _BuildInstanceHookEnv(**args)
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315 """Ensure a node has a correct known_hosts entry.
318 fullnode - Fully qualified domain name of host. (str)
319 ip - IPv4 address of host (str)
320 pubkey - the public key of the cluster
323 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
335 logger.Debug('read %s' % (repr(rawline),))
337 parts = rawline.rstrip('\r\n').split()
339 # Ignore unwanted lines
340 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341 fields = parts[0].split(',')
346 for spec in [ ip, fullnode ]:
347 if spec not in fields:
352 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353 if haveall and key == pubkey:
355 save_lines.append(rawline)
356 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359 if havesome and (not haveall or key != pubkey):
361 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364 save_lines.append(rawline)
367 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371 save_lines = save_lines + add_lines
373 # Write a new file and replace old.
374 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
376 newfile = os.fdopen(fd, 'w')
378 newfile.write(''.join(save_lines))
381 logger.Debug("Wrote new known_hosts.")
382 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385 # Simply appending a new line will do the trick.
387 for add in add_lines:
393 def _HasValidVG(vglist, vgname):
394 """Checks if the volume group list is valid.
396 A non-None return value means there's an error, and the return value
397 is the error message.
400 vgsize = vglist.get(vgname, None)
402 return "volume group '%s' missing" % vgname
404 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
409 def _InitSSHSetup(node):
410 """Setup the SSH configuration for the cluster.
413 This generates a dsa keypair for root, adds the pub key to the
414 permitted hosts and adds the hostkey to its own known hosts.
417 node: the name of this host as a fqdn
420 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
422 for name in priv_key, pub_key:
423 if os.path.exists(name):
424 utils.CreateBackup(name)
425 utils.RemoveFile(name)
427 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
431 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434 f = open(pub_key, 'r')
436 utils.AddAuthorizedKey(auth_keys, f.read(8192))
441 def _InitGanetiServerSetup(ss):
442 """Setup the necessary configuration for the initial node daemon.
444 This creates the nodepass file containing the shared password for
445 the cluster and also generates the SSL certificate.
448 # Create pseudo random password
449 randpass = sha.new(os.urandom(64)).hexdigest()
450 # and write it into sstore
451 ss.SetKey(ss.SS_NODED_PASS, randpass)
453 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454 "-days", str(365*5), "-nodes", "-x509",
455 "-keyout", constants.SSL_CERT_FILE,
456 "-out", constants.SSL_CERT_FILE, "-batch"])
458 raise errors.OpExecError("could not generate server ssl cert, command"
459 " %s had exitcode %s and error message %s" %
460 (result.cmd, result.exit_code, result.output))
462 os.chmod(constants.SSL_CERT_FILE, 0400)
464 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467 raise errors.OpExecError("Could not start the node daemon, command %s"
468 " had exitcode %s and error %s" %
469 (result.cmd, result.exit_code, result.output))
472 def _CheckInstanceBridgesExist(instance):
473 """Check that the brigdes needed by an instance exist.
476 # check bridges existance
477 brlist = [nic.bridge for nic in instance.nics]
478 if not rpc.call_bridges_exist(instance.primary_node, brlist):
479 raise errors.OpPrereqError("one or more target bridges %s does not"
480 " exist on destination node '%s'" %
481 (brlist, instance.primary_node))
484 class LUInitCluster(LogicalUnit):
485 """Initialise the cluster.
488 HPATH = "cluster-init"
489 HTYPE = constants.HTYPE_CLUSTER
490 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491 "def_bridge", "master_netdev"]
494 def BuildHooksEnv(self):
497 Notes: Since we don't require a cluster, we must manually add
498 ourselves in the post-run node list.
501 env = {"OP_TARGET": self.op.cluster_name}
502 return env, [], [self.hostname.name]
504 def CheckPrereq(self):
505 """Verify that the passed name is a valid one.
508 if config.ConfigWriter.IsCluster():
509 raise errors.OpPrereqError("Cluster is already initialised")
511 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512 if not os.path.exists(constants.VNC_PASSWORD_FILE):
513 raise errors.OpPrereqError("Please prepare the cluster VNC"
515 constants.VNC_PASSWORD_FILE)
517 self.hostname = hostname = utils.HostInfo()
519 if hostname.ip.startswith("127."):
520 raise errors.OpPrereqError("This host's IP resolves to the private"
521 " range (%s). Please fix DNS or %s." %
522 (hostname.ip, constants.ETC_HOSTS))
524 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525 source=constants.LOCALHOST_IP_ADDRESS):
526 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527 " to %s,\nbut this ip address does not"
528 " belong to this host."
529 " Aborting." % hostname.ip)
531 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
533 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
535 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
537 secondary_ip = getattr(self.op, "secondary_ip", None)
538 if secondary_ip and not utils.IsValidIP(secondary_ip):
539 raise errors.OpPrereqError("Invalid secondary ip given")
541 secondary_ip != hostname.ip and
542 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543 source=constants.LOCALHOST_IP_ADDRESS))):
544 raise errors.OpPrereqError("You gave %s as secondary IP,"
545 " but it does not belong to this host." %
547 self.secondary_ip = secondary_ip
549 # checks presence of the volume group given
550 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553 raise errors.OpPrereqError("Error: %s" % vgstatus)
555 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
557 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560 if self.op.hypervisor_type not in constants.HYPER_TYPES:
561 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562 self.op.hypervisor_type)
564 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
566 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567 (self.op.master_netdev,
568 result.output.strip()))
570 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572 raise errors.OpPrereqError("Init.d script '%s' missing or not"
573 " executable." % constants.NODE_INITD_SCRIPT)
575 def Exec(self, feedback_fn):
576 """Initialize the cluster.
579 clustername = self.clustername
580 hostname = self.hostname
582 # set up the simple store
583 self.sstore = ss = ssconf.SimpleStore()
584 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
590 # set up the inter-node password and certificate
591 _InitGanetiServerSetup(ss)
593 # start the master ip
594 rpc.call_node_start_master(hostname.name)
596 # set up ssh config and /etc/hosts
597 f = open(constants.SSH_HOST_RSA_PUB, 'r')
602 sshkey = sshline.split(" ")[1]
604 _AddHostToEtcHosts(hostname.name)
606 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
608 _InitSSHSetup(hostname.name)
610 # init of cluster config file
611 self.cfg = cfgw = config.ConfigWriter()
612 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613 sshkey, self.op.mac_prefix,
614 self.op.vg_name, self.op.def_bridge)
617 class LUDestroyCluster(NoHooksLU):
618 """Logical unit for destroying the cluster.
623 def CheckPrereq(self):
624 """Check prerequisites.
626 This checks whether the cluster is empty.
628 Any errors are signalled by raising errors.OpPrereqError.
631 master = self.sstore.GetMasterNode()
633 nodelist = self.cfg.GetNodeList()
634 if len(nodelist) != 1 or nodelist[0] != master:
635 raise errors.OpPrereqError("There are still %d node(s) in"
636 " this cluster." % (len(nodelist) - 1))
637 instancelist = self.cfg.GetInstanceList()
639 raise errors.OpPrereqError("There are still %d instance(s) in"
640 " this cluster." % len(instancelist))
642 def Exec(self, feedback_fn):
643 """Destroys the cluster.
646 master = self.sstore.GetMasterNode()
647 if not rpc.call_node_stop_master(master):
648 raise errors.OpExecError("Could not disable the master role")
649 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650 utils.CreateBackup(priv_key)
651 utils.CreateBackup(pub_key)
652 rpc.call_node_leave_cluster(master)
655 class LUVerifyCluster(LogicalUnit):
656 """Verifies the cluster status.
659 HPATH = "cluster-verify"
660 HTYPE = constants.HTYPE_CLUSTER
661 _OP_REQP = ["skip_checks"]
663 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664 remote_version, feedback_fn):
665 """Run multiple tests against a node.
668 - compares ganeti version
669 - checks vg existance and size > 20G
670 - checks config file checksum
671 - checks ssh to other nodes
674 node: name of the node to check
675 file_list: required list of files
676 local_cksum: dictionary of local files and their checksums
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 if not remote_version:
682 feedback_fn(" - ERROR: connection to %s failed" % (node))
685 if local_version != remote_version:
686 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
687 (local_version, node, remote_version))
690 # checks vg existance and size > 20G
694 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
698 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
700 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
703 # checks config file checksum
706 if 'filelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned file checksum data")
710 remote_cksum = node_result['filelist']
711 for file_name in file_list:
712 if file_name not in remote_cksum:
714 feedback_fn(" - ERROR: file '%s' missing" % file_name)
715 elif remote_cksum[file_name] != local_cksum[file_name]:
717 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
719 if 'nodelist' not in node_result:
721 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
723 if node_result['nodelist']:
725 for node in node_result['nodelist']:
726 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
727 (node, node_result['nodelist'][node]))
728 if 'node-net-test' not in node_result:
730 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
732 if node_result['node-net-test']:
734 nlist = utils.NiceSort(node_result['node-net-test'].keys())
736 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
737 (node, node_result['node-net-test'][node]))
739 hyp_result = node_result.get('hypervisor', None)
740 if hyp_result is not None:
741 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
744 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745 node_instance, feedback_fn):
746 """Verify an instance.
748 This function checks to see if the required block devices are
749 available on the instance's node.
754 node_current = instanceconfig.primary_node
757 instanceconfig.MapLVsByNode(node_vol_should)
759 for node in node_vol_should:
760 for volume in node_vol_should[node]:
761 if node not in node_vol_is or volume not in node_vol_is[node]:
762 feedback_fn(" - ERROR: volume %s missing on node %s" %
766 if not instanceconfig.status == 'down':
767 if (node_current not in node_instance or
768 not instance in node_instance[node_current]):
769 feedback_fn(" - ERROR: instance %s not running on node %s" %
770 (instance, node_current))
773 for node in node_instance:
774 if (not node == node_current):
775 if instance in node_instance[node]:
776 feedback_fn(" - ERROR: instance %s should not run on node %s" %
782 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783 """Verify if there are any unknown volumes in the cluster.
785 The .os, .swap and backup volumes are ignored. All other volumes are
791 for node in node_vol_is:
792 for volume in node_vol_is[node]:
793 if node not in node_vol_should or volume not in node_vol_should[node]:
794 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
799 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800 """Verify the list of running instances.
802 This checks what instances are running but unknown to the cluster.
806 for node in node_instance:
807 for runninginstance in node_instance[node]:
808 if runninginstance not in instancelist:
809 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
810 (runninginstance, node))
814 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815 """Verify N+1 Memory Resilience.
817 Check that if one single node dies we can still start all the instances it
823 for node, nodeinfo in node_info.iteritems():
824 # This code checks that every node which is now listed as secondary has
825 # enough memory to host all instances it is supposed to should a single
826 # other node in the cluster fail.
827 # FIXME: not ready for failover to an arbitrary node
828 # FIXME: does not support file-backed instances
829 # WARNING: we currently take into account down instances as well as up
830 # ones, considering that even if they're down someone might want to start
831 # them even in the event of a node failure.
832 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
834 for instance in instances:
835 needed_mem += instance_cfg[instance].memory
836 if nodeinfo['mfree'] < needed_mem:
837 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
838 " failovers should node %s fail" % (node, prinode))
842 def CheckPrereq(self):
843 """Check prerequisites.
845 Transform the list of checks we're going to skip into a set and check that
846 all its members are valid.
849 self.skip_set = frozenset(self.op.skip_checks)
850 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851 raise errors.OpPrereqError("Invalid checks to be skipped specified")
853 def BuildHooksEnv(self):
856 Cluster-Verify hooks just rone in the post phase and their failure makes
857 the output be logged in the verify output and the verification to fail.
860 all_nodes = self.cfg.GetNodeList()
861 tags = self.cfg.GetClusterInfo().GetTags()
862 # TODO: populate the environment with useful information for verify hooks
864 "CLUSTER_TAGS": " ".join(tags),
866 return env, [], all_nodes
868 def Exec(self, feedback_fn):
869 """Verify integrity of cluster, performing various test on nodes.
873 feedback_fn("* Verifying global settings")
874 for msg in self.cfg.VerifyConfig():
875 feedback_fn(" - ERROR: %s" % msg)
877 vg_name = self.cfg.GetVGName()
878 nodelist = utils.NiceSort(self.cfg.GetNodeList())
879 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881 i_non_redundant = [] # Non redundant instances
887 # FIXME: verify OS list
889 file_names = list(self.sstore.GetFileList())
890 file_names.append(constants.SSL_CERT_FILE)
891 file_names.append(constants.CLUSTER_CONF_FILE)
892 local_checksums = utils.FingerprintFiles(file_names)
894 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896 all_instanceinfo = rpc.call_instance_list(nodelist)
897 all_vglist = rpc.call_vg_list(nodelist)
898 node_verify_param = {
899 'filelist': file_names,
900 'nodelist': nodelist,
902 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903 for node in nodeinfo]
905 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906 all_rversion = rpc.call_version(nodelist)
907 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
909 incomplete_nodeinfo = False
911 for node in nodelist:
912 feedback_fn("* Verifying node %s" % node)
913 result = self._VerifyNode(node, file_names, local_checksums,
914 all_vglist[node], all_nvinfo[node],
915 all_rversion[node], feedback_fn)
919 volumeinfo = all_volumeinfo[node]
921 if isinstance(volumeinfo, basestring):
922 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
923 (node, volumeinfo[-400:].encode('string_escape')))
925 node_volume[node] = {}
926 elif not isinstance(volumeinfo, dict):
927 feedback_fn(" - ERROR: connection to %s failed" % (node,))
929 incomplete_nodeinfo = True
932 node_volume[node] = volumeinfo
935 nodeinstance = all_instanceinfo[node]
936 if type(nodeinstance) != list:
937 feedback_fn(" - ERROR: connection to %s failed" % (node,))
939 incomplete_nodeinfo = True
942 node_instance[node] = nodeinstance
945 nodeinfo = all_ninfo[node]
946 if not isinstance(nodeinfo, dict):
947 feedback_fn(" - ERROR: connection to %s failed" % (node,))
949 incomplete_nodeinfo = True
954 "mfree": int(nodeinfo['memory_free']),
955 "dfree": int(nodeinfo['vg_free']),
958 # dictionary holding all instances this node is secondary for,
959 # grouped by their primary node. Each key is a cluster node, and each
960 # value is a list of instances which have the key as primary and the
961 # current node as secondary. this is handy to calculate N+1 memory
962 # availability if you can only failover from a primary to its
964 "sinst-by-pnode": {},
966 except (ValueError, TypeError):
967 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
969 incomplete_nodeinfo = True
974 for instance in instancelist:
975 feedback_fn("* Verifying instance %s" % instance)
976 inst_config = self.cfg.GetInstanceInfo(instance)
977 result = self._VerifyInstance(instance, inst_config, node_volume,
978 node_instance, feedback_fn)
981 inst_config.MapLVsByNode(node_vol_should)
983 instance_cfg[instance] = inst_config
985 pnode = inst_config.primary_node
986 if pnode in node_info:
987 node_info[pnode]['pinst'].append(instance)
989 feedback_fn(" - ERROR: instance %s, connection to primary node"
990 " %s failed" % (instance, pnode))
993 # If the instance is non-redundant we cannot survive losing its primary
994 # node, so we are not N+1 compliant. On the other hand we have no disk
995 # templates with more than one secondary so that situation is not well
997 # FIXME: does not support file-backed instances
998 if len(inst_config.secondary_nodes) == 0:
999 i_non_redundant.append(instance)
1000 elif len(inst_config.secondary_nodes) > 1:
1001 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1004 for snode in inst_config.secondary_nodes:
1005 if snode in node_info:
1006 node_info[snode]['sinst'].append(instance)
1007 if pnode not in node_info[snode]['sinst-by-pnode']:
1008 node_info[snode]['sinst-by-pnode'][pnode] = []
1009 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1011 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1012 " %s failed" % (instance, snode))
1014 feedback_fn("* Verifying orphan volumes")
1015 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1019 feedback_fn("* Verifying remaining instances")
1020 result = self._VerifyOrphanInstances(instancelist, node_instance,
1024 if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1025 not incomplete_nodeinfo):
1026 feedback_fn("* Verifying N+1 Memory redundancy")
1027 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1030 feedback_fn("* Other Notes")
1032 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1033 % len(i_non_redundant))
1037 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1038 """Analize the post-hooks' result, handle it, and send some
1039 nicely-formatted feedback back to the user.
1042 phase: the hooks phase that has just been run
1043 hooks_results: the results of the multi-node hooks rpc call
1044 feedback_fn: function to send feedback back to the caller
1045 lu_result: previous Exec result
1048 # We only really run POST phase hooks, and are only interested in their results
1049 if phase == constants.HOOKS_PHASE_POST:
1050 # Used to change hooks' output to proper indentation
1051 indent_re = re.compile('^', re.M)
1052 feedback_fn("* Hooks Results")
1053 if not hooks_results:
1054 feedback_fn(" - ERROR: general communication failure")
1057 for node_name in hooks_results:
1058 show_node_header = True
1059 res = hooks_results[node_name]
1060 if res is False or not isinstance(res, list):
1061 feedback_fn(" Communication failure")
1064 for script, hkr, output in res:
1065 if hkr == constants.HKR_FAIL:
1066 # The node header is only shown once, if there are
1067 # failing hooks on that node
1068 if show_node_header:
1069 feedback_fn(" Node %s:" % node_name)
1070 show_node_header = False
1071 feedback_fn(" ERROR: Script %s failed, output:" % script)
1072 output = indent_re.sub(' ', output)
1073 feedback_fn("%s" % output)
1079 class LUVerifyDisks(NoHooksLU):
1080 """Verifies the cluster disks status.
1085 def CheckPrereq(self):
1086 """Check prerequisites.
1088 This has no prerequisites.
1093 def Exec(self, feedback_fn):
1094 """Verify integrity of cluster disks.
1097 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1099 vg_name = self.cfg.GetVGName()
1100 nodes = utils.NiceSort(self.cfg.GetNodeList())
1101 instances = [self.cfg.GetInstanceInfo(name)
1102 for name in self.cfg.GetInstanceList()]
1105 for inst in instances:
1107 if (inst.status != "up" or
1108 inst.disk_template not in constants.DTS_NET_MIRROR):
1110 inst.MapLVsByNode(inst_lvs)
1111 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1112 for node, vol_list in inst_lvs.iteritems():
1113 for vol in vol_list:
1114 nv_dict[(node, vol)] = inst
1119 node_lvs = rpc.call_volume_list(nodes, vg_name)
1124 lvs = node_lvs[node]
1126 if isinstance(lvs, basestring):
1127 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1128 res_nlvm[node] = lvs
1129 elif not isinstance(lvs, dict):
1130 logger.Info("connection to node %s failed or invalid data returned" %
1132 res_nodes.append(node)
1135 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1136 inst = nv_dict.pop((node, lv_name), None)
1137 if (not lv_online and inst is not None
1138 and inst.name not in res_instances):
1139 res_instances.append(inst.name)
1141 # any leftover items in nv_dict are missing LVs, let's arrange the
1143 for key, inst in nv_dict.iteritems():
1144 if inst.name not in res_missing:
1145 res_missing[inst.name] = []
1146 res_missing[inst.name].append(key)
1151 class LURenameCluster(LogicalUnit):
1152 """Rename the cluster.
1155 HPATH = "cluster-rename"
1156 HTYPE = constants.HTYPE_CLUSTER
1159 def BuildHooksEnv(self):
1164 "OP_TARGET": self.sstore.GetClusterName(),
1165 "NEW_NAME": self.op.name,
1167 mn = self.sstore.GetMasterNode()
1168 return env, [mn], [mn]
1170 def CheckPrereq(self):
1171 """Verify that the passed name is a valid one.
1174 hostname = utils.HostInfo(self.op.name)
1176 new_name = hostname.name
1177 self.ip = new_ip = hostname.ip
1178 old_name = self.sstore.GetClusterName()
1179 old_ip = self.sstore.GetMasterIP()
1180 if new_name == old_name and new_ip == old_ip:
1181 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1182 " cluster has changed")
1183 if new_ip != old_ip:
1184 result = utils.RunCmd(["fping", "-q", new_ip])
1185 if not result.failed:
1186 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1187 " reachable on the network. Aborting." %
1190 self.op.name = new_name
1192 def Exec(self, feedback_fn):
1193 """Rename the cluster.
1196 clustername = self.op.name
1200 # shutdown the master IP
1201 master = ss.GetMasterNode()
1202 if not rpc.call_node_stop_master(master):
1203 raise errors.OpExecError("Could not disable the master role")
1207 ss.SetKey(ss.SS_MASTER_IP, ip)
1208 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1210 # Distribute updated ss config to all nodes
1211 myself = self.cfg.GetNodeInfo(master)
1212 dist_nodes = self.cfg.GetNodeList()
1213 if myself.name in dist_nodes:
1214 dist_nodes.remove(myself.name)
1216 logger.Debug("Copying updated ssconf data to all nodes")
1217 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1218 fname = ss.KeyToFilename(keyname)
1219 result = rpc.call_upload_file(dist_nodes, fname)
1220 for to_node in dist_nodes:
1221 if not result[to_node]:
1222 logger.Error("copy of file %s to node %s failed" %
1225 if not rpc.call_node_start_master(master):
1226 logger.Error("Could not re-enable the master role on the master,"
1227 " please restart manually.")
1230 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1231 """Sleep and poll for an instance's disk to sync.
1234 if not instance.disks:
1238 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1240 node = instance.primary_node
1242 for dev in instance.disks:
1243 cfgw.SetDiskID(dev, node)
1249 cumul_degraded = False
1250 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1252 proc.LogWarning("Can't get any data from node %s" % node)
1255 raise errors.RemoteError("Can't contact node %s for mirror data,"
1256 " aborting." % node)
1260 for i in range(len(rstats)):
1263 proc.LogWarning("Can't compute data for node %s/%s" %
1264 (node, instance.disks[i].iv_name))
1266 # we ignore the ldisk parameter
1267 perc_done, est_time, is_degraded, _ = mstat
1268 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1269 if perc_done is not None:
1271 if est_time is not None:
1272 rem_time = "%d estimated seconds remaining" % est_time
1275 rem_time = "no time estimate"
1276 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1277 (instance.disks[i].iv_name, perc_done, rem_time))
1284 time.sleep(min(60, max_time))
1290 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1291 return not cumul_degraded
1294 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1295 """Check that mirrors are not degraded.
1297 The ldisk parameter, if True, will change the test from the
1298 is_degraded attribute (which represents overall non-ok status for
1299 the device(s)) to the ldisk (representing the local storage status).
1302 cfgw.SetDiskID(dev, node)
1309 if on_primary or dev.AssembleOnSecondary():
1310 rstats = rpc.call_blockdev_find(node, dev)
1312 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1315 result = result and (not rstats[idx])
1317 for child in dev.children:
1318 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1323 class LUDiagnoseOS(NoHooksLU):
1324 """Logical unit for OS diagnose/query.
1327 _OP_REQP = ["output_fields", "names"]
1329 def CheckPrereq(self):
1330 """Check prerequisites.
1332 This always succeeds, since this is a pure query LU.
1336 raise errors.OpPrereqError("Selective OS query not supported")
1338 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1339 _CheckOutputFields(static=[],
1340 dynamic=self.dynamic_fields,
1341 selected=self.op.output_fields)
1344 def _DiagnoseByOS(node_list, rlist):
1345 """Remaps a per-node return list into an a per-os per-node dictionary
1348 node_list: a list with the names of all nodes
1349 rlist: a map with node names as keys and OS objects as values
1352 map: a map with osnames as keys and as value another map, with
1354 keys and list of OS objects as values
1355 e.g. {"debian-etch": {"node1": [<object>,...],
1356 "node2": [<object>,]}
1361 for node_name, nr in rlist.iteritems():
1365 if os.name not in all_os:
1366 # build a list of nodes for this os containing empty lists
1367 # for each node in node_list
1368 all_os[os.name] = {}
1369 for nname in node_list:
1370 all_os[os.name][nname] = []
1371 all_os[os.name][node_name].append(os)
1374 def Exec(self, feedback_fn):
1375 """Compute the list of OSes.
1378 node_list = self.cfg.GetNodeList()
1379 node_data = rpc.call_os_diagnose(node_list)
1380 if node_data == False:
1381 raise errors.OpExecError("Can't gather the list of OSes")
1382 pol = self._DiagnoseByOS(node_list, node_data)
1384 for os_name, os_data in pol.iteritems():
1386 for field in self.op.output_fields:
1389 elif field == "valid":
1390 val = utils.all([osl and osl[0] for osl in os_data.values()])
1391 elif field == "node_status":
1393 for node_name, nos_list in os_data.iteritems():
1394 val[node_name] = [(v.status, v.path) for v in nos_list]
1396 raise errors.ParameterError(field)
1403 class LURemoveNode(LogicalUnit):
1404 """Logical unit for removing a node.
1407 HPATH = "node-remove"
1408 HTYPE = constants.HTYPE_NODE
1409 _OP_REQP = ["node_name"]
1411 def BuildHooksEnv(self):
1414 This doesn't run on the target node in the pre phase as a failed
1415 node would not allows itself to run.
1419 "OP_TARGET": self.op.node_name,
1420 "NODE_NAME": self.op.node_name,
1422 all_nodes = self.cfg.GetNodeList()
1423 all_nodes.remove(self.op.node_name)
1424 return env, all_nodes, all_nodes
1426 def CheckPrereq(self):
1427 """Check prerequisites.
1430 - the node exists in the configuration
1431 - it does not have primary or secondary instances
1432 - it's not the master
1434 Any errors are signalled by raising errors.OpPrereqError.
1437 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1439 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1441 instance_list = self.cfg.GetInstanceList()
1443 masternode = self.sstore.GetMasterNode()
1444 if node.name == masternode:
1445 raise errors.OpPrereqError("Node is the master node,"
1446 " you need to failover first.")
1448 for instance_name in instance_list:
1449 instance = self.cfg.GetInstanceInfo(instance_name)
1450 if node.name == instance.primary_node:
1451 raise errors.OpPrereqError("Instance %s still running on the node,"
1452 " please remove first." % instance_name)
1453 if node.name in instance.secondary_nodes:
1454 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1455 " please remove first." % instance_name)
1456 self.op.node_name = node.name
1459 def Exec(self, feedback_fn):
1460 """Removes the node from the cluster.
1464 logger.Info("stopping the node daemon and removing configs from node %s" %
1467 rpc.call_node_leave_cluster(node.name)
1469 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1471 logger.Info("Removing node %s from config" % node.name)
1473 self.cfg.RemoveNode(node.name)
1475 _RemoveHostFromEtcHosts(node.name)
1478 class LUQueryNodes(NoHooksLU):
1479 """Logical unit for querying nodes.
1482 _OP_REQP = ["output_fields", "names"]
1484 def CheckPrereq(self):
1485 """Check prerequisites.
1487 This checks that the fields required are valid output fields.
1490 self.dynamic_fields = frozenset([
1492 "mtotal", "mnode", "mfree",
1497 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1498 "pinst_list", "sinst_list",
1499 "pip", "sip", "tags"],
1500 dynamic=self.dynamic_fields,
1501 selected=self.op.output_fields)
1503 self.wanted = _GetWantedNodes(self, self.op.names)
1505 def Exec(self, feedback_fn):
1506 """Computes the list of nodes and their attributes.
1509 nodenames = self.wanted
1510 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1512 # begin data gathering
1514 if self.dynamic_fields.intersection(self.op.output_fields):
1516 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1517 for name in nodenames:
1518 nodeinfo = node_data.get(name, None)
1521 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1522 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1523 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1524 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1525 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1526 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1527 "bootid": nodeinfo['bootid'],
1530 live_data[name] = {}
1532 live_data = dict.fromkeys(nodenames, {})
1534 node_to_primary = dict([(name, set()) for name in nodenames])
1535 node_to_secondary = dict([(name, set()) for name in nodenames])
1537 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1538 "sinst_cnt", "sinst_list"))
1539 if inst_fields & frozenset(self.op.output_fields):
1540 instancelist = self.cfg.GetInstanceList()
1542 for instance_name in instancelist:
1543 inst = self.cfg.GetInstanceInfo(instance_name)
1544 if inst.primary_node in node_to_primary:
1545 node_to_primary[inst.primary_node].add(inst.name)
1546 for secnode in inst.secondary_nodes:
1547 if secnode in node_to_secondary:
1548 node_to_secondary[secnode].add(inst.name)
1550 # end data gathering
1553 for node in nodelist:
1555 for field in self.op.output_fields:
1558 elif field == "pinst_list":
1559 val = list(node_to_primary[node.name])
1560 elif field == "sinst_list":
1561 val = list(node_to_secondary[node.name])
1562 elif field == "pinst_cnt":
1563 val = len(node_to_primary[node.name])
1564 elif field == "sinst_cnt":
1565 val = len(node_to_secondary[node.name])
1566 elif field == "pip":
1567 val = node.primary_ip
1568 elif field == "sip":
1569 val = node.secondary_ip
1570 elif field == "tags":
1571 val = list(node.GetTags())
1572 elif field in self.dynamic_fields:
1573 val = live_data[node.name].get(field, None)
1575 raise errors.ParameterError(field)
1576 node_output.append(val)
1577 output.append(node_output)
1582 class LUQueryNodeVolumes(NoHooksLU):
1583 """Logical unit for getting volumes on node(s).
1586 _OP_REQP = ["nodes", "output_fields"]
1588 def CheckPrereq(self):
1589 """Check prerequisites.
1591 This checks that the fields required are valid output fields.
1594 self.nodes = _GetWantedNodes(self, self.op.nodes)
1596 _CheckOutputFields(static=["node"],
1597 dynamic=["phys", "vg", "name", "size", "instance"],
1598 selected=self.op.output_fields)
1601 def Exec(self, feedback_fn):
1602 """Computes the list of nodes and their attributes.
1605 nodenames = self.nodes
1606 volumes = rpc.call_node_volumes(nodenames)
1608 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1609 in self.cfg.GetInstanceList()]
1611 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1614 for node in nodenames:
1615 if node not in volumes or not volumes[node]:
1618 node_vols = volumes[node][:]
1619 node_vols.sort(key=lambda vol: vol['dev'])
1621 for vol in node_vols:
1623 for field in self.op.output_fields:
1626 elif field == "phys":
1630 elif field == "name":
1632 elif field == "size":
1633 val = int(float(vol['size']))
1634 elif field == "instance":
1636 if node not in lv_by_node[inst]:
1638 if vol['name'] in lv_by_node[inst][node]:
1644 raise errors.ParameterError(field)
1645 node_output.append(str(val))
1647 output.append(node_output)
1652 class LUAddNode(LogicalUnit):
1653 """Logical unit for adding node to the cluster.
1657 HTYPE = constants.HTYPE_NODE
1658 _OP_REQP = ["node_name"]
1660 def BuildHooksEnv(self):
1663 This will run on all nodes before, and on all nodes + the new node after.
1667 "OP_TARGET": self.op.node_name,
1668 "NODE_NAME": self.op.node_name,
1669 "NODE_PIP": self.op.primary_ip,
1670 "NODE_SIP": self.op.secondary_ip,
1672 nodes_0 = self.cfg.GetNodeList()
1673 nodes_1 = nodes_0 + [self.op.node_name, ]
1674 return env, nodes_0, nodes_1
1676 def CheckPrereq(self):
1677 """Check prerequisites.
1680 - the new node is not already in the config
1682 - its parameters (single/dual homed) matches the cluster
1684 Any errors are signalled by raising errors.OpPrereqError.
1687 node_name = self.op.node_name
1690 dns_data = utils.HostInfo(node_name)
1692 node = dns_data.name
1693 primary_ip = self.op.primary_ip = dns_data.ip
1694 secondary_ip = getattr(self.op, "secondary_ip", None)
1695 if secondary_ip is None:
1696 secondary_ip = primary_ip
1697 if not utils.IsValidIP(secondary_ip):
1698 raise errors.OpPrereqError("Invalid secondary IP given")
1699 self.op.secondary_ip = secondary_ip
1701 node_list = cfg.GetNodeList()
1702 if not self.op.readd and node in node_list:
1703 raise errors.OpPrereqError("Node %s is already in the configuration" %
1705 elif self.op.readd and node not in node_list:
1706 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1708 for existing_node_name in node_list:
1709 existing_node = cfg.GetNodeInfo(existing_node_name)
1711 if self.op.readd and node == existing_node_name:
1712 if (existing_node.primary_ip != primary_ip or
1713 existing_node.secondary_ip != secondary_ip):
1714 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1715 " address configuration as before")
1718 if (existing_node.primary_ip == primary_ip or
1719 existing_node.secondary_ip == primary_ip or
1720 existing_node.primary_ip == secondary_ip or
1721 existing_node.secondary_ip == secondary_ip):
1722 raise errors.OpPrereqError("New node ip address(es) conflict with"
1723 " existing node %s" % existing_node.name)
1725 # check that the type of the node (single versus dual homed) is the
1726 # same as for the master
1727 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1728 master_singlehomed = myself.secondary_ip == myself.primary_ip
1729 newbie_singlehomed = secondary_ip == primary_ip
1730 if master_singlehomed != newbie_singlehomed:
1731 if master_singlehomed:
1732 raise errors.OpPrereqError("The master has no private ip but the"
1733 " new node has one")
1735 raise errors.OpPrereqError("The master has a private ip but the"
1736 " new node doesn't have one")
1738 # checks reachablity
1739 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1740 raise errors.OpPrereqError("Node not reachable by ping")
1742 if not newbie_singlehomed:
1743 # check reachability from my secondary ip to newbie's secondary ip
1744 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1745 source=myself.secondary_ip):
1746 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1747 " based ping to noded port")
1749 self.new_node = objects.Node(name=node,
1750 primary_ip=primary_ip,
1751 secondary_ip=secondary_ip)
1753 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1754 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1755 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1756 constants.VNC_PASSWORD_FILE)
1758 def Exec(self, feedback_fn):
1759 """Adds the new node to the cluster.
1762 new_node = self.new_node
1763 node = new_node.name
1765 # set up inter-node password and certificate and restarts the node daemon
1766 gntpass = self.sstore.GetNodeDaemonPassword()
1767 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1768 raise errors.OpExecError("ganeti password corruption detected")
1769 f = open(constants.SSL_CERT_FILE)
1771 gntpem = f.read(8192)
1774 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1775 # so we use this to detect an invalid certificate; as long as the
1776 # cert doesn't contain this, the here-document will be correctly
1777 # parsed by the shell sequence below
1778 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1779 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1780 if not gntpem.endswith("\n"):
1781 raise errors.OpExecError("PEM must end with newline")
1782 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1784 # and then connect with ssh to set password and start ganeti-noded
1785 # note that all the below variables are sanitized at this point,
1786 # either by being constants or by the checks above
1788 mycommand = ("umask 077 && "
1789 "echo '%s' > '%s' && "
1790 "cat > '%s' << '!EOF.' && \n"
1791 "%s!EOF.\n%s restart" %
1792 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1793 constants.SSL_CERT_FILE, gntpem,
1794 constants.NODE_INITD_SCRIPT))
1796 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1798 raise errors.OpExecError("Remote command on node %s, error: %s,"
1800 (node, result.fail_reason, result.output))
1802 # check connectivity
1805 result = rpc.call_version([node])[node]
1807 if constants.PROTOCOL_VERSION == result:
1808 logger.Info("communication to node %s fine, sw version %s match" %
1811 raise errors.OpExecError("Version mismatch master version %s,"
1812 " node version %s" %
1813 (constants.PROTOCOL_VERSION, result))
1815 raise errors.OpExecError("Cannot get version from the new node")
1818 logger.Info("copy ssh key to node %s" % node)
1819 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1821 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1822 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1828 keyarray.append(f.read())
1832 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1833 keyarray[3], keyarray[4], keyarray[5])
1836 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1838 # Add node to our /etc/hosts, and add key to known_hosts
1839 _AddHostToEtcHosts(new_node.name)
1841 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1842 self.cfg.GetHostKey())
1844 if new_node.secondary_ip != new_node.primary_ip:
1845 if not rpc.call_node_tcp_ping(new_node.name,
1846 constants.LOCALHOST_IP_ADDRESS,
1847 new_node.secondary_ip,
1848 constants.DEFAULT_NODED_PORT,
1850 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1851 " you gave (%s). Please fix and re-run this"
1852 " command." % new_node.secondary_ip)
1854 success, msg = ssh.VerifyNodeHostname(node)
1856 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1857 " than the one the resolver gives: %s."
1858 " Please fix and re-run this command." %
1861 # Distribute updated /etc/hosts and known_hosts to all nodes,
1862 # including the node just added
1863 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1864 dist_nodes = self.cfg.GetNodeList()
1865 if not self.op.readd:
1866 dist_nodes.append(node)
1867 if myself.name in dist_nodes:
1868 dist_nodes.remove(myself.name)
1870 logger.Debug("Copying hosts and known_hosts to all nodes")
1871 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1872 result = rpc.call_upload_file(dist_nodes, fname)
1873 for to_node in dist_nodes:
1874 if not result[to_node]:
1875 logger.Error("copy of file %s to node %s failed" %
1878 to_copy = ss.GetFileList()
1879 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1880 to_copy.append(constants.VNC_PASSWORD_FILE)
1881 for fname in to_copy:
1882 if not ssh.CopyFileToNode(node, fname):
1883 logger.Error("could not copy file %s to node %s" % (fname, node))
1885 if not self.op.readd:
1886 logger.Info("adding node %s to cluster.conf" % node)
1887 self.cfg.AddNode(new_node)
1890 class LUMasterFailover(LogicalUnit):
1891 """Failover the master node to the current node.
1893 This is a special LU in that it must run on a non-master node.
1896 HPATH = "master-failover"
1897 HTYPE = constants.HTYPE_CLUSTER
1901 def BuildHooksEnv(self):
1904 This will run on the new master only in the pre phase, and on all
1905 the nodes in the post phase.
1909 "OP_TARGET": self.new_master,
1910 "NEW_MASTER": self.new_master,
1911 "OLD_MASTER": self.old_master,
1913 return env, [self.new_master], self.cfg.GetNodeList()
1915 def CheckPrereq(self):
1916 """Check prerequisites.
1918 This checks that we are not already the master.
1921 self.new_master = utils.HostInfo().name
1922 self.old_master = self.sstore.GetMasterNode()
1924 if self.old_master == self.new_master:
1925 raise errors.OpPrereqError("This commands must be run on the node"
1926 " where you want the new master to be."
1927 " %s is already the master" %
1930 def Exec(self, feedback_fn):
1931 """Failover the master node.
1933 This command, when run on a non-master node, will cause the current
1934 master to cease being master, and the non-master to become new
1938 #TODO: do not rely on gethostname returning the FQDN
1939 logger.Info("setting master to %s, old master: %s" %
1940 (self.new_master, self.old_master))
1942 if not rpc.call_node_stop_master(self.old_master):
1943 logger.Error("could disable the master role on the old master"
1944 " %s, please disable manually" % self.old_master)
1947 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1948 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1949 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1950 logger.Error("could not distribute the new simple store master file"
1951 " to the other nodes, please check.")
1953 if not rpc.call_node_start_master(self.new_master):
1954 logger.Error("could not start the master role on the new master"
1955 " %s, please check" % self.new_master)
1956 feedback_fn("Error in activating the master IP on the new master,"
1957 " please fix manually.")
1961 class LUQueryClusterInfo(NoHooksLU):
1962 """Query cluster configuration.
1968 def CheckPrereq(self):
1969 """No prerequsites needed for this LU.
1974 def Exec(self, feedback_fn):
1975 """Return cluster config.
1979 "name": self.sstore.GetClusterName(),
1980 "software_version": constants.RELEASE_VERSION,
1981 "protocol_version": constants.PROTOCOL_VERSION,
1982 "config_version": constants.CONFIG_VERSION,
1983 "os_api_version": constants.OS_API_VERSION,
1984 "export_version": constants.EXPORT_VERSION,
1985 "master": self.sstore.GetMasterNode(),
1986 "architecture": (platform.architecture()[0], platform.machine()),
1987 "hypervisor_type": self.sstore.GetHypervisorType(),
1993 class LUClusterCopyFile(NoHooksLU):
1994 """Copy file to cluster.
1997 _OP_REQP = ["nodes", "filename"]
1999 def CheckPrereq(self):
2000 """Check prerequisites.
2002 It should check that the named file exists and that the given list
2006 if not os.path.exists(self.op.filename):
2007 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2009 self.nodes = _GetWantedNodes(self, self.op.nodes)
2011 def Exec(self, feedback_fn):
2012 """Copy a file from master to some nodes.
2015 opts - class with options as members
2016 args - list containing a single element, the file name
2018 nodes - list containing the name of target nodes; if empty, all nodes
2021 filename = self.op.filename
2023 myname = utils.HostInfo().name
2025 for node in self.nodes:
2028 if not ssh.CopyFileToNode(node, filename):
2029 logger.Error("Copy of file %s to node %s failed" % (filename, node))
2032 class LUDumpClusterConfig(NoHooksLU):
2033 """Return a text-representation of the cluster-config.
2038 def CheckPrereq(self):
2039 """No prerequisites.
2044 def Exec(self, feedback_fn):
2045 """Dump a representation of the cluster config to the standard output.
2048 return self.cfg.DumpConfig()
2051 class LURunClusterCommand(NoHooksLU):
2052 """Run a command on some nodes.
2055 _OP_REQP = ["command", "nodes"]
2057 def CheckPrereq(self):
2058 """Check prerequisites.
2060 It checks that the given list of nodes is valid.
2063 self.nodes = _GetWantedNodes(self, self.op.nodes)
2065 def Exec(self, feedback_fn):
2066 """Run a command on some nodes.
2069 # put the master at the end of the nodes list
2070 master_node = self.sstore.GetMasterNode()
2071 if master_node in self.nodes:
2072 self.nodes.remove(master_node)
2073 self.nodes.append(master_node)
2076 for node in self.nodes:
2077 result = ssh.SSHCall(node, "root", self.op.command)
2078 data.append((node, result.output, result.exit_code))
2083 class LUActivateInstanceDisks(NoHooksLU):
2084 """Bring up an instance's disks.
2087 _OP_REQP = ["instance_name"]
2089 def CheckPrereq(self):
2090 """Check prerequisites.
2092 This checks that the instance is in the cluster.
2095 instance = self.cfg.GetInstanceInfo(
2096 self.cfg.ExpandInstanceName(self.op.instance_name))
2097 if instance is None:
2098 raise errors.OpPrereqError("Instance '%s' not known" %
2099 self.op.instance_name)
2100 self.instance = instance
2103 def Exec(self, feedback_fn):
2104 """Activate the disks.
2107 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2109 raise errors.OpExecError("Cannot activate block devices")
2114 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2115 """Prepare the block devices for an instance.
2117 This sets up the block devices on all nodes.
2120 instance: a ganeti.objects.Instance object
2121 ignore_secondaries: if true, errors on secondary nodes won't result
2122 in an error return from the function
2125 false if the operation failed
2126 list of (host, instance_visible_name, node_visible_name) if the operation
2127 suceeded with the mapping from node devices to instance devices
2131 iname = instance.name
2132 # With the two passes mechanism we try to reduce the window of
2133 # opportunity for the race condition of switching DRBD to primary
2134 # before handshaking occured, but we do not eliminate it
2136 # The proper fix would be to wait (with some limits) until the
2137 # connection has been made and drbd transitions from WFConnection
2138 # into any other network-connected state (Connected, SyncTarget,
2141 # 1st pass, assemble on all nodes in secondary mode
2142 for inst_disk in instance.disks:
2143 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2144 cfg.SetDiskID(node_disk, node)
2145 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2147 logger.Error("could not prepare block device %s on node %s"
2148 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2149 if not ignore_secondaries:
2152 # FIXME: race condition on drbd migration to primary
2154 # 2nd pass, do only the primary node
2155 for inst_disk in instance.disks:
2156 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2157 if node != instance.primary_node:
2159 cfg.SetDiskID(node_disk, node)
2160 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2162 logger.Error("could not prepare block device %s on node %s"
2163 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2165 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2167 # leave the disks configured for the primary node
2168 # this is a workaround that would be fixed better by
2169 # improving the logical/physical id handling
2170 for disk in instance.disks:
2171 cfg.SetDiskID(disk, instance.primary_node)
2173 return disks_ok, device_info
2176 def _StartInstanceDisks(cfg, instance, force):
2177 """Start the disks of an instance.
2180 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2181 ignore_secondaries=force)
2183 _ShutdownInstanceDisks(instance, cfg)
2184 if force is not None and not force:
2185 logger.Error("If the message above refers to a secondary node,"
2186 " you can retry the operation using '--force'.")
2187 raise errors.OpExecError("Disk consistency error")
2190 class LUDeactivateInstanceDisks(NoHooksLU):
2191 """Shutdown an instance's disks.
2194 _OP_REQP = ["instance_name"]
2196 def CheckPrereq(self):
2197 """Check prerequisites.
2199 This checks that the instance is in the cluster.
2202 instance = self.cfg.GetInstanceInfo(
2203 self.cfg.ExpandInstanceName(self.op.instance_name))
2204 if instance is None:
2205 raise errors.OpPrereqError("Instance '%s' not known" %
2206 self.op.instance_name)
2207 self.instance = instance
2209 def Exec(self, feedback_fn):
2210 """Deactivate the disks
2213 instance = self.instance
2214 ins_l = rpc.call_instance_list([instance.primary_node])
2215 ins_l = ins_l[instance.primary_node]
2216 if not type(ins_l) is list:
2217 raise errors.OpExecError("Can't contact node '%s'" %
2218 instance.primary_node)
2220 if self.instance.name in ins_l:
2221 raise errors.OpExecError("Instance is running, can't shutdown"
2224 _ShutdownInstanceDisks(instance, self.cfg)
2227 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2228 """Shutdown block devices of an instance.
2230 This does the shutdown on all nodes of the instance.
2232 If the ignore_primary is false, errors on the primary node are
2237 for disk in instance.disks:
2238 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2239 cfg.SetDiskID(top_disk, node)
2240 if not rpc.call_blockdev_shutdown(node, top_disk):
2241 logger.Error("could not shutdown block device %s on node %s" %
2242 (disk.iv_name, node))
2243 if not ignore_primary or node != instance.primary_node:
2248 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2249 """Checks if a node has enough free memory.
2251 This function check if a given node has the needed amount of free
2252 memory. In case the node has less memory or we cannot get the
2253 information from the node, this function raise an OpPrereqError
2257 - cfg: a ConfigWriter instance
2258 - node: the node name
2259 - reason: string to use in the error message
2260 - requested: the amount of memory in MiB
2263 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2264 if not nodeinfo or not isinstance(nodeinfo, dict):
2265 raise errors.OpPrereqError("Could not contact node %s for resource"
2266 " information" % (node,))
2268 free_mem = nodeinfo[node].get('memory_free')
2269 if not isinstance(free_mem, int):
2270 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2271 " was '%s'" % (node, free_mem))
2272 if requested > free_mem:
2273 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2274 " needed %s MiB, available %s MiB" %
2275 (node, reason, requested, free_mem))
2278 class LUStartupInstance(LogicalUnit):
2279 """Starts an instance.
2282 HPATH = "instance-start"
2283 HTYPE = constants.HTYPE_INSTANCE
2284 _OP_REQP = ["instance_name", "force"]
2286 def BuildHooksEnv(self):
2289 This runs on master, primary and secondary nodes of the instance.
2293 "FORCE": self.op.force,
2295 env.update(_BuildInstanceHookEnvByObject(self.instance))
2296 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2297 list(self.instance.secondary_nodes))
2300 def CheckPrereq(self):
2301 """Check prerequisites.
2303 This checks that the instance is in the cluster.
2306 instance = self.cfg.GetInstanceInfo(
2307 self.cfg.ExpandInstanceName(self.op.instance_name))
2308 if instance is None:
2309 raise errors.OpPrereqError("Instance '%s' not known" %
2310 self.op.instance_name)
2312 # check bridges existance
2313 _CheckInstanceBridgesExist(instance)
2315 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2316 "starting instance %s" % instance.name,
2319 self.instance = instance
2320 self.op.instance_name = instance.name
2322 def Exec(self, feedback_fn):
2323 """Start the instance.
2326 instance = self.instance
2327 force = self.op.force
2328 extra_args = getattr(self.op, "extra_args", "")
2330 self.cfg.MarkInstanceUp(instance.name)
2332 node_current = instance.primary_node
2334 _StartInstanceDisks(self.cfg, instance, force)
2336 if not rpc.call_instance_start(node_current, instance, extra_args):
2337 _ShutdownInstanceDisks(instance, self.cfg)
2338 raise errors.OpExecError("Could not start instance")
2341 class LURebootInstance(LogicalUnit):
2342 """Reboot an instance.
2345 HPATH = "instance-reboot"
2346 HTYPE = constants.HTYPE_INSTANCE
2347 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2349 def BuildHooksEnv(self):
2352 This runs on master, primary and secondary nodes of the instance.
2356 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2358 env.update(_BuildInstanceHookEnvByObject(self.instance))
2359 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2360 list(self.instance.secondary_nodes))
2363 def CheckPrereq(self):
2364 """Check prerequisites.
2366 This checks that the instance is in the cluster.
2369 instance = self.cfg.GetInstanceInfo(
2370 self.cfg.ExpandInstanceName(self.op.instance_name))
2371 if instance is None:
2372 raise errors.OpPrereqError("Instance '%s' not known" %
2373 self.op.instance_name)
2375 # check bridges existance
2376 _CheckInstanceBridgesExist(instance)
2378 self.instance = instance
2379 self.op.instance_name = instance.name
2381 def Exec(self, feedback_fn):
2382 """Reboot the instance.
2385 instance = self.instance
2386 ignore_secondaries = self.op.ignore_secondaries
2387 reboot_type = self.op.reboot_type
2388 extra_args = getattr(self.op, "extra_args", "")
2390 node_current = instance.primary_node
2392 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2393 constants.INSTANCE_REBOOT_HARD,
2394 constants.INSTANCE_REBOOT_FULL]:
2395 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2396 (constants.INSTANCE_REBOOT_SOFT,
2397 constants.INSTANCE_REBOOT_HARD,
2398 constants.INSTANCE_REBOOT_FULL))
2400 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2401 constants.INSTANCE_REBOOT_HARD]:
2402 if not rpc.call_instance_reboot(node_current, instance,
2403 reboot_type, extra_args):
2404 raise errors.OpExecError("Could not reboot instance")
2406 if not rpc.call_instance_shutdown(node_current, instance):
2407 raise errors.OpExecError("could not shutdown instance for full reboot")
2408 _ShutdownInstanceDisks(instance, self.cfg)
2409 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2410 if not rpc.call_instance_start(node_current, instance, extra_args):
2411 _ShutdownInstanceDisks(instance, self.cfg)
2412 raise errors.OpExecError("Could not start instance for full reboot")
2414 self.cfg.MarkInstanceUp(instance.name)
2417 class LUShutdownInstance(LogicalUnit):
2418 """Shutdown an instance.
2421 HPATH = "instance-stop"
2422 HTYPE = constants.HTYPE_INSTANCE
2423 _OP_REQP = ["instance_name"]
2425 def BuildHooksEnv(self):
2428 This runs on master, primary and secondary nodes of the instance.
2431 env = _BuildInstanceHookEnvByObject(self.instance)
2432 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2433 list(self.instance.secondary_nodes))
2436 def CheckPrereq(self):
2437 """Check prerequisites.
2439 This checks that the instance is in the cluster.
2442 instance = self.cfg.GetInstanceInfo(
2443 self.cfg.ExpandInstanceName(self.op.instance_name))
2444 if instance is None:
2445 raise errors.OpPrereqError("Instance '%s' not known" %
2446 self.op.instance_name)
2447 self.instance = instance
2449 def Exec(self, feedback_fn):
2450 """Shutdown the instance.
2453 instance = self.instance
2454 node_current = instance.primary_node
2455 self.cfg.MarkInstanceDown(instance.name)
2456 if not rpc.call_instance_shutdown(node_current, instance):
2457 logger.Error("could not shutdown instance")
2459 _ShutdownInstanceDisks(instance, self.cfg)
2462 class LUReinstallInstance(LogicalUnit):
2463 """Reinstall an instance.
2466 HPATH = "instance-reinstall"
2467 HTYPE = constants.HTYPE_INSTANCE
2468 _OP_REQP = ["instance_name"]
2470 def BuildHooksEnv(self):
2473 This runs on master, primary and secondary nodes of the instance.
2476 env = _BuildInstanceHookEnvByObject(self.instance)
2477 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2478 list(self.instance.secondary_nodes))
2481 def CheckPrereq(self):
2482 """Check prerequisites.
2484 This checks that the instance is in the cluster and is not running.
2487 instance = self.cfg.GetInstanceInfo(
2488 self.cfg.ExpandInstanceName(self.op.instance_name))
2489 if instance is None:
2490 raise errors.OpPrereqError("Instance '%s' not known" %
2491 self.op.instance_name)
2492 if instance.disk_template == constants.DT_DISKLESS:
2493 raise errors.OpPrereqError("Instance '%s' has no disks" %
2494 self.op.instance_name)
2495 if instance.status != "down":
2496 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2497 self.op.instance_name)
2498 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2500 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2501 (self.op.instance_name,
2502 instance.primary_node))
2504 self.op.os_type = getattr(self.op, "os_type", None)
2505 if self.op.os_type is not None:
2507 pnode = self.cfg.GetNodeInfo(
2508 self.cfg.ExpandNodeName(instance.primary_node))
2510 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2512 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2514 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2515 " primary node" % self.op.os_type)
2517 self.instance = instance
2519 def Exec(self, feedback_fn):
2520 """Reinstall the instance.
2523 inst = self.instance
2525 if self.op.os_type is not None:
2526 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2527 inst.os = self.op.os_type
2528 self.cfg.AddInstance(inst)
2530 _StartInstanceDisks(self.cfg, inst, None)
2532 feedback_fn("Running the instance OS create scripts...")
2533 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2534 raise errors.OpExecError("Could not install OS for instance %s"
2536 (inst.name, inst.primary_node))
2538 _ShutdownInstanceDisks(inst, self.cfg)
2541 class LURenameInstance(LogicalUnit):
2542 """Rename an instance.
2545 HPATH = "instance-rename"
2546 HTYPE = constants.HTYPE_INSTANCE
2547 _OP_REQP = ["instance_name", "new_name"]
2549 def BuildHooksEnv(self):
2552 This runs on master, primary and secondary nodes of the instance.
2555 env = _BuildInstanceHookEnvByObject(self.instance)
2556 env["INSTANCE_NEW_NAME"] = self.op.new_name
2557 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2558 list(self.instance.secondary_nodes))
2561 def CheckPrereq(self):
2562 """Check prerequisites.
2564 This checks that the instance is in the cluster and is not running.
2567 instance = self.cfg.GetInstanceInfo(
2568 self.cfg.ExpandInstanceName(self.op.instance_name))
2569 if instance is None:
2570 raise errors.OpPrereqError("Instance '%s' not known" %
2571 self.op.instance_name)
2572 if instance.status != "down":
2573 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2574 self.op.instance_name)
2575 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2577 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2578 (self.op.instance_name,
2579 instance.primary_node))
2580 self.instance = instance
2582 # new name verification
2583 name_info = utils.HostInfo(self.op.new_name)
2585 self.op.new_name = new_name = name_info.name
2586 instance_list = self.cfg.GetInstanceList()
2587 if new_name in instance_list:
2588 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2591 if not getattr(self.op, "ignore_ip", False):
2592 command = ["fping", "-q", name_info.ip]
2593 result = utils.RunCmd(command)
2594 if not result.failed:
2595 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2596 (name_info.ip, new_name))
2599 def Exec(self, feedback_fn):
2600 """Reinstall the instance.
2603 inst = self.instance
2604 old_name = inst.name
2606 self.cfg.RenameInstance(inst.name, self.op.new_name)
2608 # re-read the instance from the configuration after rename
2609 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2611 _StartInstanceDisks(self.cfg, inst, None)
2613 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2615 msg = ("Could run OS rename script for instance %s on node %s (but the"
2616 " instance has been renamed in Ganeti)" %
2617 (inst.name, inst.primary_node))
2620 _ShutdownInstanceDisks(inst, self.cfg)
2623 class LURemoveInstance(LogicalUnit):
2624 """Remove an instance.
2627 HPATH = "instance-remove"
2628 HTYPE = constants.HTYPE_INSTANCE
2629 _OP_REQP = ["instance_name", "ignore_failures"]
2631 def BuildHooksEnv(self):
2634 This runs on master, primary and secondary nodes of the instance.
2637 env = _BuildInstanceHookEnvByObject(self.instance)
2638 nl = [self.sstore.GetMasterNode()]
2641 def CheckPrereq(self):
2642 """Check prerequisites.
2644 This checks that the instance is in the cluster.
2647 instance = self.cfg.GetInstanceInfo(
2648 self.cfg.ExpandInstanceName(self.op.instance_name))
2649 if instance is None:
2650 raise errors.OpPrereqError("Instance '%s' not known" %
2651 self.op.instance_name)
2652 self.instance = instance
2654 def Exec(self, feedback_fn):
2655 """Remove the instance.
2658 instance = self.instance
2659 logger.Info("shutting down instance %s on node %s" %
2660 (instance.name, instance.primary_node))
2662 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2663 if self.op.ignore_failures:
2664 feedback_fn("Warning: can't shutdown instance")
2666 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2667 (instance.name, instance.primary_node))
2669 logger.Info("removing block devices for instance %s" % instance.name)
2671 if not _RemoveDisks(instance, self.cfg):
2672 if self.op.ignore_failures:
2673 feedback_fn("Warning: can't remove instance's disks")
2675 raise errors.OpExecError("Can't remove instance's disks")
2677 logger.Info("removing instance %s out of cluster config" % instance.name)
2679 self.cfg.RemoveInstance(instance.name)
2682 class LUQueryInstances(NoHooksLU):
2683 """Logical unit for querying instances.
2686 _OP_REQP = ["output_fields", "names"]
2688 def CheckPrereq(self):
2689 """Check prerequisites.
2691 This checks that the fields required are valid output fields.
2694 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2695 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2696 "admin_state", "admin_ram",
2697 "disk_template", "ip", "mac", "bridge",
2698 "sda_size", "sdb_size", "vcpus", "tags"],
2699 dynamic=self.dynamic_fields,
2700 selected=self.op.output_fields)
2702 self.wanted = _GetWantedInstances(self, self.op.names)
2704 def Exec(self, feedback_fn):
2705 """Computes the list of nodes and their attributes.
2708 instance_names = self.wanted
2709 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2712 # begin data gathering
2714 nodes = frozenset([inst.primary_node for inst in instance_list])
2717 if self.dynamic_fields.intersection(self.op.output_fields):
2719 node_data = rpc.call_all_instances_info(nodes)
2721 result = node_data[name]
2723 live_data.update(result)
2724 elif result == False:
2725 bad_nodes.append(name)
2726 # else no instance is alive
2728 live_data = dict([(name, {}) for name in instance_names])
2730 # end data gathering
2733 for instance in instance_list:
2735 for field in self.op.output_fields:
2740 elif field == "pnode":
2741 val = instance.primary_node
2742 elif field == "snodes":
2743 val = list(instance.secondary_nodes)
2744 elif field == "admin_state":
2745 val = (instance.status != "down")
2746 elif field == "oper_state":
2747 if instance.primary_node in bad_nodes:
2750 val = bool(live_data.get(instance.name))
2751 elif field == "status":
2752 if instance.primary_node in bad_nodes:
2753 val = "ERROR_nodedown"
2755 running = bool(live_data.get(instance.name))
2757 if instance.status != "down":
2762 if instance.status != "down":
2766 elif field == "admin_ram":
2767 val = instance.memory
2768 elif field == "oper_ram":
2769 if instance.primary_node in bad_nodes:
2771 elif instance.name in live_data:
2772 val = live_data[instance.name].get("memory", "?")
2775 elif field == "disk_template":
2776 val = instance.disk_template
2778 val = instance.nics[0].ip
2779 elif field == "bridge":
2780 val = instance.nics[0].bridge
2781 elif field == "mac":
2782 val = instance.nics[0].mac
2783 elif field == "sda_size" or field == "sdb_size":
2784 disk = instance.FindDisk(field[:3])
2789 elif field == "vcpus":
2790 val = instance.vcpus
2791 elif field == "tags":
2792 val = list(instance.GetTags())
2794 raise errors.ParameterError(field)
2801 class LUFailoverInstance(LogicalUnit):
2802 """Failover an instance.
2805 HPATH = "instance-failover"
2806 HTYPE = constants.HTYPE_INSTANCE
2807 _OP_REQP = ["instance_name", "ignore_consistency"]
2809 def BuildHooksEnv(self):
2812 This runs on master, primary and secondary nodes of the instance.
2816 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2818 env.update(_BuildInstanceHookEnvByObject(self.instance))
2819 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2822 def CheckPrereq(self):
2823 """Check prerequisites.
2825 This checks that the instance is in the cluster.
2828 instance = self.cfg.GetInstanceInfo(
2829 self.cfg.ExpandInstanceName(self.op.instance_name))
2830 if instance is None:
2831 raise errors.OpPrereqError("Instance '%s' not known" %
2832 self.op.instance_name)
2834 if instance.disk_template not in constants.DTS_NET_MIRROR:
2835 raise errors.OpPrereqError("Instance's disk layout is not"
2836 " network mirrored, cannot failover.")
2838 secondary_nodes = instance.secondary_nodes
2839 if not secondary_nodes:
2840 raise errors.ProgrammerError("no secondary node but using "
2841 "DT_REMOTE_RAID1 template")
2843 target_node = secondary_nodes[0]
2844 # check memory requirements on the secondary node
2845 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2846 instance.name, instance.memory)
2848 # check bridge existance
2849 brlist = [nic.bridge for nic in instance.nics]
2850 if not rpc.call_bridges_exist(target_node, brlist):
2851 raise errors.OpPrereqError("One or more target bridges %s does not"
2852 " exist on destination node '%s'" %
2853 (brlist, target_node))
2855 self.instance = instance
2857 def Exec(self, feedback_fn):
2858 """Failover an instance.
2860 The failover is done by shutting it down on its present node and
2861 starting it on the secondary.
2864 instance = self.instance
2866 source_node = instance.primary_node
2867 target_node = instance.secondary_nodes[0]
2869 feedback_fn("* checking disk consistency between source and target")
2870 for dev in instance.disks:
2871 # for remote_raid1, these are md over drbd
2872 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2873 if instance.status == "up" and not self.op.ignore_consistency:
2874 raise errors.OpExecError("Disk %s is degraded on target node,"
2875 " aborting failover." % dev.iv_name)
2877 feedback_fn("* shutting down instance on source node")
2878 logger.Info("Shutting down instance %s on node %s" %
2879 (instance.name, source_node))
2881 if not rpc.call_instance_shutdown(source_node, instance):
2882 if self.op.ignore_consistency:
2883 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2884 " anyway. Please make sure node %s is down" %
2885 (instance.name, source_node, source_node))
2887 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2888 (instance.name, source_node))
2890 feedback_fn("* deactivating the instance's disks on source node")
2891 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2892 raise errors.OpExecError("Can't shut down the instance's disks.")
2894 instance.primary_node = target_node
2895 # distribute new instance config to the other nodes
2896 self.cfg.Update(instance)
2898 # Only start the instance if it's marked as up
2899 if instance.status == "up":
2900 feedback_fn("* activating the instance's disks on target node")
2901 logger.Info("Starting instance %s on node %s" %
2902 (instance.name, target_node))
2904 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2905 ignore_secondaries=True)
2907 _ShutdownInstanceDisks(instance, self.cfg)
2908 raise errors.OpExecError("Can't activate the instance's disks")
2910 feedback_fn("* starting the instance on the target node")
2911 if not rpc.call_instance_start(target_node, instance, None):
2912 _ShutdownInstanceDisks(instance, self.cfg)
2913 raise errors.OpExecError("Could not start instance %s on node %s." %
2914 (instance.name, target_node))
2917 class LUMigrateInstance(LogicalUnit):
2918 """Migrate an instance.
2920 This is migration without shutting down, compared to the failover,
2921 which is done with shutdown.
2924 HPATH = "instance-migrate"
2925 HTYPE = constants.HTYPE_INSTANCE
2926 _OP_REQP = ["instance_name", "live"]
2928 def BuildHooksEnv(self):
2931 This runs on master, primary and secondary nodes of the instance.
2934 env = _BuildInstanceHookEnvByObject(self.instance)
2935 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2938 def CheckPrereq(self):
2939 """Check prerequisites.
2941 This checks that the instance is in the cluster.
2944 instance = self.cfg.GetInstanceInfo(
2945 self.cfg.ExpandInstanceName(self.op.instance_name))
2946 if instance is None:
2947 raise errors.OpPrereqError("Instance '%s' not known" %
2948 self.op.instance_name)
2950 if instance.disk_template != constants.DT_DRBD8:
2951 raise errors.OpPrereqError("Instance's disk layout is not"
2952 " drbd8, cannot migrate.")
2954 secondary_nodes = instance.secondary_nodes
2955 if not secondary_nodes:
2956 raise errors.ProgrammerError("no secondary node but using "
2957 "drbd8 disk template")
2959 target_node = secondary_nodes[0]
2960 # check memory requirements on the secondary node
2961 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2962 instance.name, instance.memory)
2964 # check bridge existance
2965 brlist = [nic.bridge for nic in instance.nics]
2966 if not rpc.call_bridges_exist(target_node, brlist):
2967 raise errors.OpPrereqError("One or more target bridges %s does not"
2968 " exist on destination node '%s'" %
2969 (brlist, target_node))
2971 ins_l = rpc.call_instance_list([instance.primary_node])
2972 ins_l = ins_l[instance.primary_node]
2973 if not type(ins_l) is list:
2974 raise errors.OpPrereqError("Can't contact node '%s'" %
2975 instance.primary_node)
2977 if instance.name not in ins_l:
2978 raise errors.OpPrereqError("Instance is not running, can't migrate"
2979 " - please use failover instead")
2980 self.instance = instance
2982 def Exec(self, feedback_fn):
2983 """Migrate an instance.
2985 The migrate is done by:
2986 - change the disks into dual-master mode
2987 - wait until disks are fully synchronized again
2988 - migrate the instance
2989 - change disks on the new secondary node (the old primary) to secondary
2990 - wait until disks are fully synchronized
2991 - change disks into single-master mode
2994 instance = self.instance
2996 source_node = instance.primary_node
2997 target_node = instance.secondary_nodes[0]
2998 all_nodes = [source_node, target_node]
3000 feedback_fn("* checking disk consistency between source and target")
3001 for dev in instance.disks:
3002 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3003 raise errors.OpExecError("Disk %s is degraded or not fully"
3004 " synchronized on target node,"
3005 " aborting migrate." % dev.iv_name)
3007 feedback_fn("* ensuring the target is in secondary mode")
3008 result = rpc.call_blockdev_close(target_node, instance.name,
3010 if not result or not result[0]:
3011 raise errors.BlockDeviceError("Cannot switch target node to"
3012 " secondary: %s" % result[1])
3014 feedback_fn("* changing disks into dual-master mode")
3015 logger.Info("Changing disks for instance %s into dual-master mode" %
3019 source_node: self.cfg.GetNodeInfo(source_node).secondary_ip,
3020 target_node: self.cfg.GetNodeInfo(target_node).secondary_ip,
3022 result = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3023 instance.disks, nodes_ip, True)
3024 for node in all_nodes:
3025 if not result[node] or not result[node][0]:
3026 raise errors.OpExecError("Cannot change disks config on node %s,"
3027 " error %s" % (node, result[node][1]))
3029 _WaitForSync(self.cfg, instance, self.proc)
3031 feedback_fn("* migrating instance to %s" % target_node)
3032 result = rpc.call_instance_migrate(source_node, instance,
3033 nodes_ip[target_node], self.op.live)
3034 if not result or not result[0]:
3035 logger.Error("Instance migration failed, trying to revert disk status")
3036 res2 = rpc.call_blockdev_close(target_node, instance.name,
3038 if not res2 or not res2[0]:
3039 feedback_fn("Disk close on secondary failed, instance disks left"
3040 " in dual-master mode")
3042 res2 = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3043 instance.disks, nodes_ip, False)
3044 if not (res2 and res2[source_node][0] and res2[target_node][0]):
3045 logger.Error("Warning: DRBD change to single-master failed: shutdown"
3046 " manually the instance disks or via deactivate-disks,"
3047 " and then restart them via activate-disks")
3048 raise errors.OpExecError("Could not migrate instance %s: %s" %
3049 (instance.name, result[1]))
3051 instance.primary_node = target_node
3052 # distribute new instance config to the other nodes
3053 self.cfg.Update(instance)
3055 feedback_fn("* changing the instance's disks on source node to secondary")
3056 result = rpc.call_blockdev_close(source_node, instance.name,
3058 if not result or not result[0]:
3059 logger.Error("Warning: DRBD deactivation failed: shutdown manually the"
3060 " instance disks or via deactivate-disks, and then"
3061 " restart them via activate-disks")
3063 _WaitForSync(self.cfg, instance, self.proc)
3065 feedback_fn("* changing the instance's disks to single-master")
3066 result = rpc.call_drbd_reconfig_net(all_nodes, instance.name,
3067 instance.disks, nodes_ip, False)
3068 if not (result and result[source_node][0] and result[target_node][0]):
3069 logger.Error("Warning: DRBD change to single-master failed: shutdown"
3070 " manually the instance disks or via deactivate-disks,"
3071 " and then restart them via activate-disks")
3074 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3075 """Create a tree of block devices on the primary node.
3077 This always creates all devices.
3081 for child in device.children:
3082 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3085 cfg.SetDiskID(device, node)
3086 new_id = rpc.call_blockdev_create(node, device, device.size,
3087 instance.name, True, info)
3090 if device.physical_id is None:
3091 device.physical_id = new_id
3095 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3096 """Create a tree of block devices on a secondary node.
3098 If this device type has to be created on secondaries, create it and
3101 If not, just recurse to children keeping the same 'force' value.
3104 if device.CreateOnSecondary():
3107 for child in device.children:
3108 if not _CreateBlockDevOnSecondary(cfg, node, instance,
3109 child, force, info):
3114 cfg.SetDiskID(device, node)
3115 new_id = rpc.call_blockdev_create(node, device, device.size,
3116 instance.name, False, info)
3119 if device.physical_id is None:
3120 device.physical_id = new_id
3124 def _GenerateUniqueNames(cfg, exts):
3125 """Generate a suitable LV name.
3127 This will generate a logical volume name for the given instance.
3132 new_id = cfg.GenerateUniqueID()
3133 results.append("%s%s" % (new_id, val))
3137 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3138 """Generate a drbd device complete with its children.
3141 port = cfg.AllocatePort()
3142 vgname = cfg.GetVGName()
3143 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3144 logical_id=(vgname, names[0]))
3145 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3146 logical_id=(vgname, names[1]))
3147 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3148 logical_id = (primary, secondary, port),
3149 children = [dev_data, dev_meta])
3153 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3154 """Generate a drbd8 device complete with its children.
3157 port = cfg.AllocatePort()
3158 vgname = cfg.GetVGName()
3159 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3160 logical_id=(vgname, names[0]))
3161 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3162 logical_id=(vgname, names[1]))
3163 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3164 logical_id = (primary, secondary, port),
3165 children = [dev_data, dev_meta],
3169 def _GenerateDiskTemplate(cfg, template_name,
3170 instance_name, primary_node,
3171 secondary_nodes, disk_sz, swap_sz):
3172 """Generate the entire disk layout for a given template type.
3175 #TODO: compute space requirements
3177 vgname = cfg.GetVGName()
3178 if template_name == constants.DT_DISKLESS:
3180 elif template_name == constants.DT_PLAIN:
3181 if len(secondary_nodes) != 0:
3182 raise errors.ProgrammerError("Wrong template configuration")
3184 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3185 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3186 logical_id=(vgname, names[0]),
3188 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3189 logical_id=(vgname, names[1]),
3191 disks = [sda_dev, sdb_dev]
3192 elif template_name == constants.DT_LOCAL_RAID1:
3193 if len(secondary_nodes) != 0:
3194 raise errors.ProgrammerError("Wrong template configuration")
3197 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3198 ".sdb_m1", ".sdb_m2"])
3199 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3200 logical_id=(vgname, names[0]))
3201 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3202 logical_id=(vgname, names[1]))
3203 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3205 children = [sda_dev_m1, sda_dev_m2])
3206 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3207 logical_id=(vgname, names[2]))
3208 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3209 logical_id=(vgname, names[3]))
3210 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3212 children = [sdb_dev_m1, sdb_dev_m2])
3213 disks = [md_sda_dev, md_sdb_dev]
3214 elif template_name == constants.DT_REMOTE_RAID1:
3215 if len(secondary_nodes) != 1:
3216 raise errors.ProgrammerError("Wrong template configuration")
3217 remote_node = secondary_nodes[0]
3218 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3219 ".sdb_data", ".sdb_meta"])
3220 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3221 disk_sz, names[0:2])
3222 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3223 children = [drbd_sda_dev], size=disk_sz)
3224 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3225 swap_sz, names[2:4])
3226 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3227 children = [drbd_sdb_dev], size=swap_sz)
3228 disks = [md_sda_dev, md_sdb_dev]
3229 elif template_name == constants.DT_DRBD8:
3230 if len(secondary_nodes) != 1:
3231 raise errors.ProgrammerError("Wrong template configuration")
3232 remote_node = secondary_nodes[0]
3233 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3234 ".sdb_data", ".sdb_meta"])
3235 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3236 disk_sz, names[0:2], "sda")
3237 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3238 swap_sz, names[2:4], "sdb")
3239 disks = [drbd_sda_dev, drbd_sdb_dev]
3241 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3245 def _GetInstanceInfoText(instance):
3246 """Compute that text that should be added to the disk's metadata.
3249 return "originstname+%s" % instance.name
3252 def _CreateDisks(cfg, instance):
3253 """Create all disks for an instance.
3255 This abstracts away some work from AddInstance.
3258 instance: the instance object
3261 True or False showing the success of the creation process
3264 info = _GetInstanceInfoText(instance)
3266 for device in instance.disks:
3267 logger.Info("creating volume %s for instance %s" %
3268 (device.iv_name, instance.name))
3270 for secondary_node in instance.secondary_nodes:
3271 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3272 device, False, info):
3273 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3274 (device.iv_name, device, secondary_node))
3277 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3278 instance, device, info):
3279 logger.Error("failed to create volume %s on primary!" %
3285 def _RemoveDisks(instance, cfg):
3286 """Remove all disks for an instance.
3288 This abstracts away some work from `AddInstance()` and
3289 `RemoveInstance()`. Note that in case some of the devices couldn't
3290 be removed, the removal will continue with the other ones (compare
3291 with `_CreateDisks()`).
3294 instance: the instance object
3297 True or False showing the success of the removal proces
3300 logger.Info("removing block devices for instance %s" % instance.name)
3303 for device in instance.disks:
3304 for node, disk in device.ComputeNodeTree(instance.primary_node):
3305 cfg.SetDiskID(disk, node)
3306 if not rpc.call_blockdev_remove(node, disk):
3307 logger.Error("could not remove block device %s on node %s,"
3308 " continuing anyway" %
3309 (device.iv_name, node))
3314 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3315 """Compute disk size requirements in the volume group
3317 This is currently hard-coded for the two-drive layout.
3320 # Required free disk space as a function of disk and swap space
3322 constants.DT_DISKLESS: None,
3323 constants.DT_PLAIN: disk_size + swap_size,
3324 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3325 # 256 MB are added for drbd metadata, 128MB for each drbd device
3326 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3327 constants.DT_DRBD8: disk_size + swap_size + 256,
3330 if disk_template not in req_size_dict:
3331 raise errors.ProgrammerError("Disk template '%s' size requirement"
3332 " is unknown" % disk_template)
3334 return req_size_dict[disk_template]
3337 class LUCreateInstance(LogicalUnit):
3338 """Create an instance.
3341 HPATH = "instance-add"
3342 HTYPE = constants.HTYPE_INSTANCE
3343 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3344 "disk_template", "swap_size", "mode", "start", "vcpus",
3345 "wait_for_sync", "ip_check", "mac"]
3347 def _RunAllocator(self):
3348 """Run the allocator based on input opcode.
3351 disks = [{"size": self.op.disk_size, "mode": "w"},
3352 {"size": self.op.swap_size, "mode": "w"}]
3353 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3354 "bridge": self.op.bridge}]
3355 ial = IAllocator(self.cfg, self.sstore,
3356 mode=constants.IALLOCATOR_MODE_ALLOC,
3357 name=self.op.instance_name,
3358 disk_template=self.op.disk_template,
3361 vcpus=self.op.vcpus,
3362 mem_size=self.op.mem_size,
3367 ial.Run(self.op.iallocator)
3370 raise errors.OpPrereqError("Can't compute nodes using"
3371 " iallocator '%s': %s" % (self.op.iallocator,
3373 if len(ial.nodes) != ial.required_nodes:
3374 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3375 " of nodes (%s), required %s" %
3376 (len(ial.nodes), ial.required_nodes))
3377 self.op.pnode = ial.nodes[0]
3378 logger.ToStdout("Selected nodes for the instance: %s" %
3379 (", ".join(ial.nodes),))
3380 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3381 (self.op.instance_name, self.op.iallocator, ial.nodes))
3382 if ial.required_nodes == 2:
3383 self.op.snode = ial.nodes[1]
3385 def BuildHooksEnv(self):
3388 This runs on master, primary and secondary nodes of the instance.
3392 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3393 "INSTANCE_DISK_SIZE": self.op.disk_size,
3394 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3395 "INSTANCE_ADD_MODE": self.op.mode,
3397 if self.op.mode == constants.INSTANCE_IMPORT:
3398 env["INSTANCE_SRC_NODE"] = self.op.src_node
3399 env["INSTANCE_SRC_PATH"] = self.op.src_path
3400 env["INSTANCE_SRC_IMAGE"] = self.src_image
3402 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3403 primary_node=self.op.pnode,
3404 secondary_nodes=self.secondaries,
3405 status=self.instance_status,
3406 os_type=self.op.os_type,
3407 memory=self.op.mem_size,
3408 vcpus=self.op.vcpus,
3409 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3412 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3417 def CheckPrereq(self):
3418 """Check prerequisites.
3421 # set optional parameters to none if they don't exist
3422 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3423 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3424 "vnc_bind_address"]:
3425 if not hasattr(self.op, attr):
3426 setattr(self.op, attr, None)
3428 if self.op.mode not in (constants.INSTANCE_CREATE,
3429 constants.INSTANCE_IMPORT):
3430 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3433 if self.op.mode == constants.INSTANCE_IMPORT:
3434 src_node = getattr(self.op, "src_node", None)
3435 src_path = getattr(self.op, "src_path", None)
3436 if src_node is None or src_path is None:
3437 raise errors.OpPrereqError("Importing an instance requires source"
3438 " node and path options")
3439 src_node_full = self.cfg.ExpandNodeName(src_node)
3440 if src_node_full is None:
3441 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3442 self.op.src_node = src_node = src_node_full
3444 if not os.path.isabs(src_path):
3445 raise errors.OpPrereqError("The source path must be absolute")
3447 export_info = rpc.call_export_info(src_node, src_path)
3450 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3452 if not export_info.has_section(constants.INISECT_EXP):
3453 raise errors.ProgrammerError("Corrupted export config")
3455 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3456 if (int(ei_version) != constants.EXPORT_VERSION):
3457 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3458 (ei_version, constants.EXPORT_VERSION))
3460 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3461 raise errors.OpPrereqError("Can't import instance with more than"
3464 # FIXME: are the old os-es, disk sizes, etc. useful?
3465 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3466 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3468 self.src_image = diskimage
3469 else: # INSTANCE_CREATE
3470 if getattr(self.op, "os_type", None) is None:
3471 raise errors.OpPrereqError("No guest OS specified")
3473 #### instance parameters check
3475 # disk template and mirror node verification
3476 if self.op.disk_template not in constants.DISK_TEMPLATES:
3477 raise errors.OpPrereqError("Invalid disk template name")
3479 # instance name verification
3480 hostname1 = utils.HostInfo(self.op.instance_name)
3482 self.op.instance_name = instance_name = hostname1.name
3483 instance_list = self.cfg.GetInstanceList()
3484 if instance_name in instance_list:
3485 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3488 # ip validity checks
3489 ip = getattr(self.op, "ip", None)
3490 if ip is None or ip.lower() == "none":
3492 elif ip.lower() == "auto":
3493 inst_ip = hostname1.ip
3495 if not utils.IsValidIP(ip):
3496 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3497 " like a valid IP" % ip)
3499 self.inst_ip = self.op.ip = inst_ip
3501 if self.op.start and not self.op.ip_check:
3502 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3503 " adding an instance in start mode")
3505 if self.op.ip_check:
3506 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3507 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3508 (hostname1.ip, instance_name))
3510 # MAC address verification
3511 if self.op.mac != "auto":
3512 if not utils.IsValidMac(self.op.mac.lower()):
3513 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3516 # bridge verification
3517 bridge = getattr(self.op, "bridge", None)
3519 self.op.bridge = self.cfg.GetDefBridge()
3521 self.op.bridge = bridge
3523 # boot order verification
3524 if self.op.hvm_boot_order is not None:
3525 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3526 raise errors.OpPrereqError("invalid boot order specified,"
3527 " must be one or more of [acdn]")
3530 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3531 raise errors.OpPrereqError("One and only one of iallocator and primary"
3532 " node must be given")
3534 if self.op.iallocator is not None:
3535 self._RunAllocator()
3537 #### node related checks
3539 # check primary node
3540 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3542 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3544 self.op.pnode = pnode.name
3546 self.secondaries = []
3548 # mirror node verification
3549 if self.op.disk_template in constants.DTS_NET_MIRROR:
3550 if getattr(self.op, "snode", None) is None:
3551 raise errors.OpPrereqError("The networked disk templates need"
3554 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3555 if snode_name is None:
3556 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3558 elif snode_name == pnode.name:
3559 raise errors.OpPrereqError("The secondary node cannot be"
3560 " the primary node.")
3561 self.secondaries.append(snode_name)
3563 req_size = _ComputeDiskSize(self.op.disk_template,
3564 self.op.disk_size, self.op.swap_size)
3566 # Check lv size requirements
3567 if req_size is not None:
3568 nodenames = [pnode.name] + self.secondaries
3569 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3570 for node in nodenames:
3571 info = nodeinfo.get(node, None)
3573 raise errors.OpPrereqError("Cannot get current information"
3574 " from node '%s'" % node)
3575 vg_free = info.get('vg_free', None)
3576 if not isinstance(vg_free, int):
3577 raise errors.OpPrereqError("Can't compute free disk space on"
3579 if req_size > info['vg_free']:
3580 raise errors.OpPrereqError("Not enough disk space on target node %s."
3581 " %d MB available, %d MB required" %
3582 (node, info['vg_free'], req_size))
3585 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3587 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3588 " primary node" % self.op.os_type)
3590 if self.op.kernel_path == constants.VALUE_NONE:
3591 raise errors.OpPrereqError("Can't set instance kernel to none")
3594 # bridge check on primary node
3595 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3596 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3597 " destination node '%s'" %
3598 (self.op.bridge, pnode.name))
3600 # memory check on primary node
3602 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3603 "creating instance %s" % self.op.instance_name,
3606 # hvm_cdrom_image_path verification
3607 if self.op.hvm_cdrom_image_path is not None:
3608 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3609 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3610 " be an absolute path or None, not %s" %
3611 self.op.hvm_cdrom_image_path)
3612 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3613 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3614 " regular file or a symlink pointing to"
3615 " an existing regular file, not %s" %
3616 self.op.hvm_cdrom_image_path)
3618 # vnc_bind_address verification
3619 if self.op.vnc_bind_address is not None:
3620 if not utils.IsValidIP(self.op.vnc_bind_address):
3621 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3622 " like a valid IP address" %
3623 self.op.vnc_bind_address)
3626 self.instance_status = 'up'
3628 self.instance_status = 'down'
3630 def Exec(self, feedback_fn):
3631 """Create and add the instance to the cluster.
3634 instance = self.op.instance_name
3635 pnode_name = self.pnode.name
3637 if self.op.mac == "auto":
3638 mac_address = self.cfg.GenerateMAC()
3640 mac_address = self.op.mac
3642 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3643 if self.inst_ip is not None:
3644 nic.ip = self.inst_ip
3646 ht_kind = self.sstore.GetHypervisorType()
3647 if ht_kind in constants.HTS_REQ_PORT:
3648 network_port = self.cfg.AllocatePort()
3652 if self.op.vnc_bind_address is None:
3653 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3655 disks = _GenerateDiskTemplate(self.cfg,
3656 self.op.disk_template,
3657 instance, pnode_name,
3658 self.secondaries, self.op.disk_size,
3661 iobj = objects.Instance(name=instance, os=self.op.os_type,
3662 primary_node=pnode_name,
3663 memory=self.op.mem_size,
3664 vcpus=self.op.vcpus,
3665 nics=[nic], disks=disks,
3666 disk_template=self.op.disk_template,
3667 status=self.instance_status,
3668 network_port=network_port,
3669 kernel_path=self.op.kernel_path,
3670 initrd_path=self.op.initrd_path,
3671 hvm_boot_order=self.op.hvm_boot_order,
3672 hvm_acpi=self.op.hvm_acpi,
3673 hvm_pae=self.op.hvm_pae,
3674 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3675 vnc_bind_address=self.op.vnc_bind_address,
3678 feedback_fn("* creating instance disks...")
3679 if not _CreateDisks(self.cfg, iobj):
3680 _RemoveDisks(iobj, self.cfg)
3681 raise errors.OpExecError("Device creation failed, reverting...")
3683 feedback_fn("adding instance %s to cluster config" % instance)
3685 self.cfg.AddInstance(iobj)
3687 if self.op.wait_for_sync:
3688 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3689 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3690 # make sure the disks are not degraded (still sync-ing is ok)
3692 feedback_fn("* checking mirrors status")
3693 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3698 _RemoveDisks(iobj, self.cfg)
3699 self.cfg.RemoveInstance(iobj.name)
3700 raise errors.OpExecError("There are some degraded disks for"
3703 feedback_fn("creating os for instance %s on node %s" %
3704 (instance, pnode_name))
3706 if iobj.disk_template != constants.DT_DISKLESS:
3707 if self.op.mode == constants.INSTANCE_CREATE:
3708 feedback_fn("* running the instance OS create scripts...")
3709 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3710 raise errors.OpExecError("could not add os for instance %s"
3712 (instance, pnode_name))
3714 elif self.op.mode == constants.INSTANCE_IMPORT:
3715 feedback_fn("* running the instance OS import scripts...")
3716 src_node = self.op.src_node
3717 src_image = self.src_image
3718 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3719 src_node, src_image):
3720 raise errors.OpExecError("Could not import os for instance"
3722 (instance, pnode_name))
3724 # also checked in the prereq part
3725 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3729 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3730 feedback_fn("* starting instance...")
3731 if not rpc.call_instance_start(pnode_name, iobj, None):
3732 raise errors.OpExecError("Could not start instance")
3735 class LUConnectConsole(NoHooksLU):
3736 """Connect to an instance's console.
3738 This is somewhat special in that it returns the command line that
3739 you need to run on the master node in order to connect to the
3743 _OP_REQP = ["instance_name"]
3745 def CheckPrereq(self):
3746 """Check prerequisites.
3748 This checks that the instance is in the cluster.
3751 instance = self.cfg.GetInstanceInfo(
3752 self.cfg.ExpandInstanceName(self.op.instance_name))
3753 if instance is None:
3754 raise errors.OpPrereqError("Instance '%s' not known" %
3755 self.op.instance_name)
3756 self.instance = instance
3758 def Exec(self, feedback_fn):
3759 """Connect to the console of an instance
3762 instance = self.instance
3763 node = instance.primary_node
3765 node_insts = rpc.call_instance_list([node])[node]
3766 if node_insts is False:
3767 raise errors.OpExecError("Can't connect to node %s." % node)
3769 if instance.name not in node_insts:
3770 raise errors.OpExecError("Instance %s is not running." % instance.name)
3772 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3774 hyper = hypervisor.GetHypervisor()
3775 console_cmd = hyper.GetShellCommandForConsole(instance)
3777 argv = ["ssh", "-q", "-t"]
3778 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3779 argv.extend(ssh.BATCH_MODE_OPTS)
3781 argv.append(console_cmd)
3785 class LUAddMDDRBDComponent(LogicalUnit):
3786 """Adda new mirror member to an instance's disk.
3789 HPATH = "mirror-add"
3790 HTYPE = constants.HTYPE_INSTANCE
3791 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3793 def BuildHooksEnv(self):
3796 This runs on the master, the primary and all the secondaries.
3800 "NEW_SECONDARY": self.op.remote_node,
3801 "DISK_NAME": self.op.disk_name,
3803 env.update(_BuildInstanceHookEnvByObject(self.instance))
3804 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3805 self.op.remote_node,] + list(self.instance.secondary_nodes)
3808 def CheckPrereq(self):
3809 """Check prerequisites.
3811 This checks that the instance is in the cluster.
3814 instance = self.cfg.GetInstanceInfo(
3815 self.cfg.ExpandInstanceName(self.op.instance_name))
3816 if instance is None:
3817 raise errors.OpPrereqError("Instance '%s' not known" %
3818 self.op.instance_name)
3819 self.instance = instance
3821 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3822 if remote_node is None:
3823 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3824 self.remote_node = remote_node
3826 if remote_node == instance.primary_node:
3827 raise errors.OpPrereqError("The specified node is the primary node of"
3830 if instance.disk_template != constants.DT_REMOTE_RAID1:
3831 raise errors.OpPrereqError("Instance's disk layout is not"
3833 for disk in instance.disks:
3834 if disk.iv_name == self.op.disk_name:
3837 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3838 " instance." % self.op.disk_name)
3839 if len(disk.children) > 1:
3840 raise errors.OpPrereqError("The device already has two slave devices."
3841 " This would create a 3-disk raid1 which we"
3845 def Exec(self, feedback_fn):
3846 """Add the mirror component
3850 instance = self.instance
3852 remote_node = self.remote_node
3853 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3854 names = _GenerateUniqueNames(self.cfg, lv_names)
3855 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3856 remote_node, disk.size, names)
3858 logger.Info("adding new mirror component on secondary")
3860 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3862 _GetInstanceInfoText(instance)):
3863 raise errors.OpExecError("Failed to create new component on secondary"
3864 " node %s" % remote_node)
3866 logger.Info("adding new mirror component on primary")
3868 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3870 _GetInstanceInfoText(instance)):
3871 # remove secondary dev
3872 self.cfg.SetDiskID(new_drbd, remote_node)
3873 rpc.call_blockdev_remove(remote_node, new_drbd)
3874 raise errors.OpExecError("Failed to create volume on primary")
3876 # the device exists now
3877 # call the primary node to add the mirror to md
3878 logger.Info("adding new mirror component to md")
3879 if not rpc.call_blockdev_addchildren(instance.primary_node,
3881 logger.Error("Can't add mirror compoment to md!")
3882 self.cfg.SetDiskID(new_drbd, remote_node)
3883 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3884 logger.Error("Can't rollback on secondary")
3885 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3886 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3887 logger.Error("Can't rollback on primary")
3888 raise errors.OpExecError("Can't add mirror component to md array")
3890 disk.children.append(new_drbd)
3892 self.cfg.AddInstance(instance)
3894 _WaitForSync(self.cfg, instance, self.proc)
3899 class LURemoveMDDRBDComponent(LogicalUnit):
3900 """Remove a component from a remote_raid1 disk.
3903 HPATH = "mirror-remove"
3904 HTYPE = constants.HTYPE_INSTANCE
3905 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3907 def BuildHooksEnv(self):
3910 This runs on the master, the primary and all the secondaries.
3914 "DISK_NAME": self.op.disk_name,
3915 "DISK_ID": self.op.disk_id,
3916 "OLD_SECONDARY": self.old_secondary,
3918 env.update(_BuildInstanceHookEnvByObject(self.instance))
3919 nl = [self.sstore.GetMasterNode(),
3920 self.instance.primary_node] + list(self.instance.secondary_nodes)
3923 def CheckPrereq(self):
3924 """Check prerequisites.
3926 This checks that the instance is in the cluster.
3929 instance = self.cfg.GetInstanceInfo(
3930 self.cfg.ExpandInstanceName(self.op.instance_name))
3931 if instance is None:
3932 raise errors.OpPrereqError("Instance '%s' not known" %
3933 self.op.instance_name)
3934 self.instance = instance
3936 if instance.disk_template != constants.DT_REMOTE_RAID1:
3937 raise errors.OpPrereqError("Instance's disk layout is not"
3939 for disk in instance.disks:
3940 if disk.iv_name == self.op.disk_name:
3943 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3944 " instance." % self.op.disk_name)
3945 for child in disk.children:
3946 if (child.dev_type == constants.LD_DRBD7 and
3947 child.logical_id[2] == self.op.disk_id):
3950 raise errors.OpPrereqError("Can't find the device with this port.")
3952 if len(disk.children) < 2:
3953 raise errors.OpPrereqError("Cannot remove the last component from"
3957 if self.child.logical_id[0] == instance.primary_node:
3961 self.old_secondary = self.child.logical_id[oid]
3963 def Exec(self, feedback_fn):
3964 """Remove the mirror component
3967 instance = self.instance
3970 logger.Info("remove mirror component")
3971 self.cfg.SetDiskID(disk, instance.primary_node)
3972 if not rpc.call_blockdev_removechildren(instance.primary_node,
3974 raise errors.OpExecError("Can't remove child from mirror.")
3976 for node in child.logical_id[:2]:
3977 self.cfg.SetDiskID(child, node)
3978 if not rpc.call_blockdev_remove(node, child):
3979 logger.Error("Warning: failed to remove device from node %s,"
3980 " continuing operation." % node)
3982 disk.children.remove(child)
3983 self.cfg.AddInstance(instance)
3986 class LUReplaceDisks(LogicalUnit):
3987 """Replace the disks of an instance.
3990 HPATH = "mirrors-replace"
3991 HTYPE = constants.HTYPE_INSTANCE
3992 _OP_REQP = ["instance_name", "mode", "disks"]
3994 def _RunAllocator(self):
3995 """Compute a new secondary node using an IAllocator.
3998 ial = IAllocator(self.cfg, self.sstore,
3999 mode=constants.IALLOCATOR_MODE_RELOC,
4000 name=self.op.instance_name,
4001 relocate_from=[self.sec_node])
4003 ial.Run(self.op.iallocator)
4006 raise errors.OpPrereqError("Can't compute nodes using"
4007 " iallocator '%s': %s" % (self.op.iallocator,
4009 if len(ial.nodes) != ial.required_nodes:
4010 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4011 " of nodes (%s), required %s" %
4012 (len(ial.nodes), ial.required_nodes))
4013 self.op.remote_node = ial.nodes[0]
4014 logger.ToStdout("Selected new secondary for the instance: %s" %
4015 self.op.remote_node)
4017 def BuildHooksEnv(self):
4020 This runs on the master, the primary and all the secondaries.
4024 "MODE": self.op.mode,
4025 "NEW_SECONDARY": self.op.remote_node,
4026 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4028 env.update(_BuildInstanceHookEnvByObject(self.instance))
4030 self.sstore.GetMasterNode(),
4031 self.instance.primary_node,
4033 if self.op.remote_node is not None:
4034 nl.append(self.op.remote_node)
4037 def CheckPrereq(self):
4038 """Check prerequisites.
4040 This checks that the instance is in the cluster.
4043 if not hasattr(self.op, "remote_node"):
4044 self.op.remote_node = None
4046 instance = self.cfg.GetInstanceInfo(
4047 self.cfg.ExpandInstanceName(self.op.instance_name))
4048 if instance is None:
4049 raise errors.OpPrereqError("Instance '%s' not known" %
4050 self.op.instance_name)
4051 self.instance = instance
4052 self.op.instance_name = instance.name
4054 if instance.disk_template not in constants.DTS_NET_MIRROR:
4055 raise errors.OpPrereqError("Instance's disk layout is not"
4056 " network mirrored.")
4058 if len(instance.secondary_nodes) != 1:
4059 raise errors.OpPrereqError("The instance has a strange layout,"
4060 " expected one secondary but found %d" %
4061 len(instance.secondary_nodes))
4063 self.sec_node = instance.secondary_nodes[0]
4065 ia_name = getattr(self.op, "iallocator", None)
4066 if ia_name is not None:
4067 if self.op.remote_node is not None:
4068 raise errors.OpPrereqError("Give either the iallocator or the new"
4069 " secondary, not both")
4070 self.op.remote_node = self._RunAllocator()
4072 remote_node = self.op.remote_node
4073 if remote_node is not None:
4074 remote_node = self.cfg.ExpandNodeName(remote_node)
4075 if remote_node is None:
4076 raise errors.OpPrereqError("Node '%s' not known" %
4077 self.op.remote_node)
4078 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4080 self.remote_node_info = None
4081 if remote_node == instance.primary_node:
4082 raise errors.OpPrereqError("The specified node is the primary node of"
4084 elif remote_node == self.sec_node:
4085 if self.op.mode == constants.REPLACE_DISK_SEC:
4086 # this is for DRBD8, where we can't execute the same mode of
4087 # replacement as for drbd7 (no different port allocated)
4088 raise errors.OpPrereqError("Same secondary given, cannot execute"
4090 # the user gave the current secondary, switch to
4091 # 'no-replace-secondary' mode for drbd7
4093 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4094 self.op.mode != constants.REPLACE_DISK_ALL):
4095 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4096 " disks replacement, not individual ones")
4097 if instance.disk_template == constants.DT_DRBD8:
4098 if (self.op.mode == constants.REPLACE_DISK_ALL and
4099 remote_node is not None):
4100 # switch to replace secondary mode
4101 self.op.mode = constants.REPLACE_DISK_SEC
4103 if self.op.mode == constants.REPLACE_DISK_ALL:
4104 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4105 " secondary disk replacement, not"
4107 elif self.op.mode == constants.REPLACE_DISK_PRI:
4108 if remote_node is not None:
4109 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4110 " the secondary while doing a primary"
4111 " node disk replacement")
4112 self.tgt_node = instance.primary_node
4113 self.oth_node = instance.secondary_nodes[0]
4114 elif self.op.mode == constants.REPLACE_DISK_SEC:
4115 self.new_node = remote_node # this can be None, in which case
4116 # we don't change the secondary
4117 self.tgt_node = instance.secondary_nodes[0]
4118 self.oth_node = instance.primary_node
4120 raise errors.ProgrammerError("Unhandled disk replace mode")
4122 for name in self.op.disks:
4123 if instance.FindDisk(name) is None:
4124 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4125 (name, instance.name))
4126 self.op.remote_node = remote_node
4128 def _ExecRR1(self, feedback_fn):
4129 """Replace the disks of an instance.
4132 instance = self.instance
4135 if self.op.remote_node is None:
4136 remote_node = self.sec_node
4138 remote_node = self.op.remote_node
4140 for dev in instance.disks:
4142 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4143 names = _GenerateUniqueNames(cfg, lv_names)
4144 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4145 remote_node, size, names)
4146 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4147 logger.Info("adding new mirror component on secondary for %s" %
4150 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4152 _GetInstanceInfoText(instance)):
4153 raise errors.OpExecError("Failed to create new component on secondary"
4154 " node %s. Full abort, cleanup manually!" %
4157 logger.Info("adding new mirror component on primary")
4159 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4161 _GetInstanceInfoText(instance)):
4162 # remove secondary dev
4163 cfg.SetDiskID(new_drbd, remote_node)
4164 rpc.call_blockdev_remove(remote_node, new_drbd)
4165 raise errors.OpExecError("Failed to create volume on primary!"
4166 " Full abort, cleanup manually!!")
4168 # the device exists now
4169 # call the primary node to add the mirror to md
4170 logger.Info("adding new mirror component to md")
4171 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4173 logger.Error("Can't add mirror compoment to md!")
4174 cfg.SetDiskID(new_drbd, remote_node)
4175 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4176 logger.Error("Can't rollback on secondary")
4177 cfg.SetDiskID(new_drbd, instance.primary_node)
4178 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4179 logger.Error("Can't rollback on primary")
4180 raise errors.OpExecError("Full abort, cleanup manually!!")
4182 dev.children.append(new_drbd)
4183 cfg.AddInstance(instance)
4185 # this can fail as the old devices are degraded and _WaitForSync
4186 # does a combined result over all disks, so we don't check its
4188 _WaitForSync(cfg, instance, self.proc, unlock=True)
4190 # so check manually all the devices
4191 for name in iv_names:
4192 dev, child, new_drbd = iv_names[name]
4193 cfg.SetDiskID(dev, instance.primary_node)
4194 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4196 raise errors.OpExecError("MD device %s is degraded!" % name)
4197 cfg.SetDiskID(new_drbd, instance.primary_node)
4198 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4200 raise errors.OpExecError("New drbd device %s is degraded!" % name)
4202 for name in iv_names:
4203 dev, child, new_drbd = iv_names[name]
4204 logger.Info("remove mirror %s component" % name)
4205 cfg.SetDiskID(dev, instance.primary_node)
4206 if not rpc.call_blockdev_removechildren(instance.primary_node,
4208 logger.Error("Can't remove child from mirror, aborting"
4209 " *this device cleanup*.\nYou need to cleanup manually!!")
4212 for node in child.logical_id[:2]:
4213 logger.Info("remove child device on %s" % node)
4214 cfg.SetDiskID(child, node)
4215 if not rpc.call_blockdev_remove(node, child):
4216 logger.Error("Warning: failed to remove device from node %s,"
4217 " continuing operation." % node)
4219 dev.children.remove(child)
4221 cfg.AddInstance(instance)
4223 def _ExecD8DiskOnly(self, feedback_fn):
4224 """Replace a disk on the primary or secondary for dbrd8.
4226 The algorithm for replace is quite complicated:
4227 - for each disk to be replaced:
4228 - create new LVs on the target node with unique names
4229 - detach old LVs from the drbd device
4230 - rename old LVs to name_replaced.<time_t>
4231 - rename new LVs to old LVs
4232 - attach the new LVs (with the old names now) to the drbd device
4233 - wait for sync across all devices
4234 - for each modified disk:
4235 - remove old LVs (which have the name name_replaces.<time_t>)
4237 Failures are not very well handled.
4241 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4242 instance = self.instance
4244 vgname = self.cfg.GetVGName()
4247 tgt_node = self.tgt_node
4248 oth_node = self.oth_node
4250 # Step: check device activation
4251 self.proc.LogStep(1, steps_total, "check device existence")
4252 info("checking volume groups")
4253 my_vg = cfg.GetVGName()
4254 results = rpc.call_vg_list([oth_node, tgt_node])
4256 raise errors.OpExecError("Can't list volume groups on the nodes")
4257 for node in oth_node, tgt_node:
4258 res = results.get(node, False)
4259 if not res or my_vg not in res:
4260 raise errors.OpExecError("Volume group '%s' not found on %s" %
4262 for dev in instance.disks:
4263 if not dev.iv_name in self.op.disks:
4265 for node in tgt_node, oth_node:
4266 info("checking %s on %s" % (dev.iv_name, node))
4267 cfg.SetDiskID(dev, node)
4268 if not rpc.call_blockdev_find(node, dev):
4269 raise errors.OpExecError("Can't find device %s on node %s" %
4270 (dev.iv_name, node))
4272 # Step: check other node consistency
4273 self.proc.LogStep(2, steps_total, "check peer consistency")
4274 for dev in instance.disks:
4275 if not dev.iv_name in self.op.disks:
4277 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4278 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4279 oth_node==instance.primary_node):
4280 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4281 " to replace disks on this node (%s)" %
4282 (oth_node, tgt_node))
4284 # Step: create new storage
4285 self.proc.LogStep(3, steps_total, "allocate new storage")
4286 for dev in instance.disks:
4287 if not dev.iv_name in self.op.disks:
4290 cfg.SetDiskID(dev, tgt_node)
4291 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4292 names = _GenerateUniqueNames(cfg, lv_names)
4293 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4294 logical_id=(vgname, names[0]))
4295 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4296 logical_id=(vgname, names[1]))
4297 new_lvs = [lv_data, lv_meta]
4298 old_lvs = dev.children
4299 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4300 info("creating new local storage on %s for %s" %
4301 (tgt_node, dev.iv_name))
4302 # since we *always* want to create this LV, we use the
4303 # _Create...OnPrimary (which forces the creation), even if we
4304 # are talking about the secondary node
4305 for new_lv in new_lvs:
4306 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4307 _GetInstanceInfoText(instance)):
4308 raise errors.OpExecError("Failed to create new LV named '%s' on"
4310 (new_lv.logical_id[1], tgt_node))
4312 # Step: for each lv, detach+rename*2+attach
4313 self.proc.LogStep(4, steps_total, "change drbd configuration")
4314 for dev, old_lvs, new_lvs in iv_names.itervalues():
4315 info("detaching %s drbd from local storage" % dev.iv_name)
4316 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4317 raise errors.OpExecError("Can't detach drbd from local storage on node"
4318 " %s for device %s" % (tgt_node, dev.iv_name))
4320 #cfg.Update(instance)
4322 # ok, we created the new LVs, so now we know we have the needed
4323 # storage; as such, we proceed on the target node to rename
4324 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4325 # using the assumption that logical_id == physical_id (which in
4326 # turn is the unique_id on that node)
4328 # FIXME(iustin): use a better name for the replaced LVs
4329 temp_suffix = int(time.time())
4330 ren_fn = lambda d, suff: (d.physical_id[0],
4331 d.physical_id[1] + "_replaced-%s" % suff)
4332 # build the rename list based on what LVs exist on the node
4334 for to_ren in old_lvs:
4335 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4336 if find_res is not None: # device exists
4337 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4339 info("renaming the old LVs on the target node")
4340 if not rpc.call_blockdev_rename(tgt_node, rlist):
4341 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4342 # now we rename the new LVs to the old LVs
4343 info("renaming the new LVs on the target node")
4344 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4345 if not rpc.call_blockdev_rename(tgt_node, rlist):
4346 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4348 for old, new in zip(old_lvs, new_lvs):
4349 new.logical_id = old.logical_id
4350 cfg.SetDiskID(new, tgt_node)
4352 for disk in old_lvs:
4353 disk.logical_id = ren_fn(disk, temp_suffix)
4354 cfg.SetDiskID(disk, tgt_node)
4356 # now that the new lvs have the old name, we can add them to the device
4357 info("adding new mirror component on %s" % tgt_node)
4358 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4359 for new_lv in new_lvs:
4360 if not rpc.call_blockdev_remove(tgt_node, new_lv):
4361 warning("Can't rollback device %s", hint="manually cleanup unused"
4363 raise errors.OpExecError("Can't add local storage to drbd")
4365 dev.children = new_lvs
4366 cfg.Update(instance)
4368 # Step: wait for sync
4370 # this can fail as the old devices are degraded and _WaitForSync
4371 # does a combined result over all disks, so we don't check its
4373 self.proc.LogStep(5, steps_total, "sync devices")
4374 _WaitForSync(cfg, instance, self.proc, unlock=True)
4376 # so check manually all the devices
4377 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4378 cfg.SetDiskID(dev, instance.primary_node)
4379 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4381 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4383 # Step: remove old storage
4384 self.proc.LogStep(6, steps_total, "removing old storage")
4385 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4386 info("remove logical volumes for %s" % name)
4388 cfg.SetDiskID(lv, tgt_node)
4389 if not rpc.call_blockdev_remove(tgt_node, lv):
4390 warning("Can't remove old LV", hint="manually remove unused LVs")
4393 def _ExecD8Secondary(self, feedback_fn):
4394 """Replace the secondary node for drbd8.
4396 The algorithm for replace is quite complicated:
4397 - for all disks of the instance:
4398 - create new LVs on the new node with same names
4399 - shutdown the drbd device on the old secondary
4400 - disconnect the drbd network on the primary
4401 - create the drbd device on the new secondary
4402 - network attach the drbd on the primary, using an artifice:
4403 the drbd code for Attach() will connect to the network if it
4404 finds a device which is connected to the good local disks but
4406 - wait for sync across all devices
4407 - remove all disks from the old secondary
4409 Failures are not very well handled.
4413 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4414 instance = self.instance
4416 vgname = self.cfg.GetVGName()
4419 old_node = self.tgt_node
4420 new_node = self.new_node
4421 pri_node = instance.primary_node
4423 # Step: check device activation
4424 self.proc.LogStep(1, steps_total, "check device existence")
4425 info("checking volume groups")
4426 my_vg = cfg.GetVGName()
4427 results = rpc.call_vg_list([pri_node, new_node])
4429 raise errors.OpExecError("Can't list volume groups on the nodes")
4430 for node in pri_node, new_node:
4431 res = results.get(node, False)
4432 if not res or my_vg not in res:
4433 raise errors.OpExecError("Volume group '%s' not found on %s" %
4435 for dev in instance.disks:
4436 if not dev.iv_name in self.op.disks:
4438 info("checking %s on %s" % (dev.iv_name, pri_node))
4439 cfg.SetDiskID(dev, pri_node)
4440 if not rpc.call_blockdev_find(pri_node, dev):
4441 raise errors.OpExecError("Can't find device %s on node %s" %
4442 (dev.iv_name, pri_node))
4444 # Step: check other node consistency
4445 self.proc.LogStep(2, steps_total, "check peer consistency")
4446 for dev in instance.disks:
4447 if not dev.iv_name in self.op.disks:
4449 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4450 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4451 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4452 " unsafe to replace the secondary" %
4455 # Step: create new storage
4456 self.proc.LogStep(3, steps_total, "allocate new storage")
4457 for dev in instance.disks:
4459 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4460 # since we *always* want to create this LV, we use the
4461 # _Create...OnPrimary (which forces the creation), even if we
4462 # are talking about the secondary node
4463 for new_lv in dev.children:
4464 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4465 _GetInstanceInfoText(instance)):
4466 raise errors.OpExecError("Failed to create new LV named '%s' on"
4468 (new_lv.logical_id[1], new_node))
4470 iv_names[dev.iv_name] = (dev, dev.children)
4472 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4473 for dev in instance.disks:
4475 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4476 # create new devices on new_node
4477 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4478 logical_id=(pri_node, new_node,
4480 children=dev.children)
4481 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4483 _GetInstanceInfoText(instance)):
4484 raise errors.OpExecError("Failed to create new DRBD on"
4485 " node '%s'" % new_node)
4487 for dev in instance.disks:
4488 # we have new devices, shutdown the drbd on the old secondary
4489 info("shutting down drbd for %s on old node" % dev.iv_name)
4490 cfg.SetDiskID(dev, old_node)
4491 if not rpc.call_blockdev_shutdown(old_node, dev):
4492 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4493 hint="Please cleanup this device manually as soon as possible")
4495 info("detaching primary drbds from the network (=> standalone)")
4497 for dev in instance.disks:
4498 cfg.SetDiskID(dev, pri_node)
4499 # set the physical (unique in bdev terms) id to None, meaning
4500 # detach from network
4501 dev.physical_id = (None,) * len(dev.physical_id)
4502 # and 'find' the device, which will 'fix' it to match the
4504 if rpc.call_blockdev_find(pri_node, dev):
4507 warning("Failed to detach drbd %s from network, unusual case" %
4511 # no detaches succeeded (very unlikely)
4512 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4514 # if we managed to detach at least one, we update all the disks of
4515 # the instance to point to the new secondary
4516 info("updating instance configuration")
4517 for dev in instance.disks:
4518 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4519 cfg.SetDiskID(dev, pri_node)
4520 cfg.Update(instance)
4522 # and now perform the drbd attach
4523 info("attaching primary drbds to new secondary (standalone => connected)")
4525 for dev in instance.disks:
4526 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4527 # since the attach is smart, it's enough to 'find' the device,
4528 # it will automatically activate the network, if the physical_id
4530 cfg.SetDiskID(dev, pri_node)
4531 if not rpc.call_blockdev_find(pri_node, dev):
4532 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4533 "please do a gnt-instance info to see the status of disks")
4535 # this can fail as the old devices are degraded and _WaitForSync
4536 # does a combined result over all disks, so we don't check its
4538 self.proc.LogStep(5, steps_total, "sync devices")
4539 _WaitForSync(cfg, instance, self.proc, unlock=True)
4541 # so check manually all the devices
4542 for name, (dev, old_lvs) in iv_names.iteritems():
4543 cfg.SetDiskID(dev, pri_node)
4544 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4546 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4548 self.proc.LogStep(6, steps_total, "removing old storage")
4549 for name, (dev, old_lvs) in iv_names.iteritems():
4550 info("remove logical volumes for %s" % name)
4552 cfg.SetDiskID(lv, old_node)
4553 if not rpc.call_blockdev_remove(old_node, lv):
4554 warning("Can't remove LV on old secondary",
4555 hint="Cleanup stale volumes by hand")
4557 def Exec(self, feedback_fn):
4558 """Execute disk replacement.
4560 This dispatches the disk replacement to the appropriate handler.
4563 instance = self.instance
4565 # Activate the instance disks if we're replacing them on a down instance
4566 if instance.status == "down":
4567 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4568 self.proc.ChainOpCode(op)
4570 if instance.disk_template == constants.DT_REMOTE_RAID1:
4572 elif instance.disk_template == constants.DT_DRBD8:
4573 if self.op.remote_node is None:
4574 fn = self._ExecD8DiskOnly
4576 fn = self._ExecD8Secondary
4578 raise errors.ProgrammerError("Unhandled disk replacement case")
4580 ret = fn(feedback_fn)
4582 # Deactivate the instance disks if we're replacing them on a down instance
4583 if instance.status == "down":
4584 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4585 self.proc.ChainOpCode(op)
4590 class LUGrowDisk(LogicalUnit):
4591 """Grow a disk of an instance.
4595 HTYPE = constants.HTYPE_INSTANCE
4596 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4598 def BuildHooksEnv(self):
4601 This runs on the master, the primary and all the secondaries.
4605 "DISK": self.op.disk,
4606 "AMOUNT": self.op.amount,
4608 env.update(_BuildInstanceHookEnvByObject(self.instance))
4610 self.sstore.GetMasterNode(),
4611 self.instance.primary_node,
4615 def CheckPrereq(self):
4616 """Check prerequisites.
4618 This checks that the instance is in the cluster.
4621 instance = self.cfg.GetInstanceInfo(
4622 self.cfg.ExpandInstanceName(self.op.instance_name))
4623 if instance is None:
4624 raise errors.OpPrereqError("Instance '%s' not known" %
4625 self.op.instance_name)
4627 if self.op.amount <= 0:
4628 raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4630 self.instance = instance
4631 self.op.instance_name = instance.name
4633 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4634 raise errors.OpPrereqError("Instance's disk layout does not support"
4637 self.disk = instance.FindDisk(self.op.disk)
4638 if self.disk is None:
4639 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4640 (self.op.disk, instance.name))
4642 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4643 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4644 for node in nodenames:
4645 info = nodeinfo.get(node, None)
4647 raise errors.OpPrereqError("Cannot get current information"
4648 " from node '%s'" % node)
4649 vg_free = info.get('vg_free', None)
4650 if not isinstance(vg_free, int):
4651 raise errors.OpPrereqError("Can't compute free disk space on"
4653 if self.op.amount > info['vg_free']:
4654 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4655 " %d MiB available, %d MiB required" %
4656 (node, info['vg_free'], self.op.amount))
4657 is_primary = (node == instance.primary_node)
4658 if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4659 raise errors.OpPrereqError("Disk %s is degraded or not fully"
4660 " synchronized on node %s,"
4661 " aborting grow." % (self.op.disk, node))
4663 def Exec(self, feedback_fn):
4664 """Execute disk grow.
4667 instance = self.instance
4669 for node in (instance.secondary_nodes + (instance.primary_node,)):
4670 self.cfg.SetDiskID(disk, node)
4671 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4672 if not result or not isinstance(result, tuple) or len(result) != 2:
4673 raise errors.OpExecError("grow request failed to node %s" % node)
4675 raise errors.OpExecError("grow request failed to node %s: %s" %
4677 disk.RecordGrow(self.op.amount)
4678 self.cfg.Update(instance)
4679 if self.op.wait_for_sync:
4680 disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4682 logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4683 " Please check the instance.")
4686 class LUQueryInstanceData(NoHooksLU):
4687 """Query runtime instance data.
4690 _OP_REQP = ["instances"]
4692 def CheckPrereq(self):
4693 """Check prerequisites.
4695 This only checks the optional instance list against the existing names.
4698 if not isinstance(self.op.instances, list):
4699 raise errors.OpPrereqError("Invalid argument type 'instances'")
4700 if self.op.instances:
4701 self.wanted_instances = []
4702 names = self.op.instances
4704 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4705 if instance is None:
4706 raise errors.OpPrereqError("No such instance name '%s'" % name)
4707 self.wanted_instances.append(instance)
4709 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4710 in self.cfg.GetInstanceList()]
4714 def _ComputeDiskStatus(self, instance, snode, dev):
4715 """Compute block device status.
4718 self.cfg.SetDiskID(dev, instance.primary_node)
4719 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4720 if dev.dev_type in constants.LDS_DRBD:
4721 # we change the snode then (otherwise we use the one passed in)
4722 if dev.logical_id[0] == instance.primary_node:
4723 snode = dev.logical_id[1]
4725 snode = dev.logical_id[0]
4728 self.cfg.SetDiskID(dev, snode)
4729 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4734 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4735 for child in dev.children]
4740 "iv_name": dev.iv_name,
4741 "dev_type": dev.dev_type,
4742 "logical_id": dev.logical_id,
4743 "physical_id": dev.physical_id,
4744 "pstatus": dev_pstatus,
4745 "sstatus": dev_sstatus,
4746 "children": dev_children,
4751 def Exec(self, feedback_fn):
4752 """Gather and return data"""
4754 for instance in self.wanted_instances:
4755 remote_info = rpc.call_instance_info(instance.primary_node,
4757 if remote_info and "state" in remote_info:
4760 remote_state = "down"
4761 if instance.status == "down":
4762 config_state = "down"
4766 disks = [self._ComputeDiskStatus(instance, None, device)
4767 for device in instance.disks]
4770 "name": instance.name,
4771 "config_state": config_state,
4772 "run_state": remote_state,
4773 "pnode": instance.primary_node,
4774 "snodes": instance.secondary_nodes,
4776 "memory": instance.memory,
4777 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4779 "vcpus": instance.vcpus,
4782 htkind = self.sstore.GetHypervisorType()
4783 if htkind == constants.HT_XEN_PVM30:
4784 idict["kernel_path"] = instance.kernel_path
4785 idict["initrd_path"] = instance.initrd_path
4787 if htkind == constants.HT_XEN_HVM31:
4788 idict["hvm_boot_order"] = instance.hvm_boot_order
4789 idict["hvm_acpi"] = instance.hvm_acpi
4790 idict["hvm_pae"] = instance.hvm_pae
4791 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4793 if htkind in constants.HTS_REQ_PORT:
4794 idict["vnc_bind_address"] = instance.vnc_bind_address
4795 idict["network_port"] = instance.network_port
4797 result[instance.name] = idict
4802 class LUSetInstanceParms(LogicalUnit):
4803 """Modifies an instances's parameters.
4806 HPATH = "instance-modify"
4807 HTYPE = constants.HTYPE_INSTANCE
4808 _OP_REQP = ["instance_name"]
4810 def BuildHooksEnv(self):
4813 This runs on the master, primary and secondaries.
4818 args['memory'] = self.mem
4820 args['vcpus'] = self.vcpus
4821 if self.do_ip or self.do_bridge or self.mac:
4825 ip = self.instance.nics[0].ip
4827 bridge = self.bridge
4829 bridge = self.instance.nics[0].bridge
4833 mac = self.instance.nics[0].mac
4834 args['nics'] = [(ip, bridge, mac)]
4835 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4836 nl = [self.sstore.GetMasterNode(),
4837 self.instance.primary_node] + list(self.instance.secondary_nodes)
4840 def CheckPrereq(self):
4841 """Check prerequisites.
4843 This only checks the instance list against the existing names.
4846 self.mem = getattr(self.op, "mem", None)
4847 self.vcpus = getattr(self.op, "vcpus", None)
4848 self.ip = getattr(self.op, "ip", None)
4849 self.mac = getattr(self.op, "mac", None)
4850 self.bridge = getattr(self.op, "bridge", None)
4851 self.kernel_path = getattr(self.op, "kernel_path", None)
4852 self.initrd_path = getattr(self.op, "initrd_path", None)
4853 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4854 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4855 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4856 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4857 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4858 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4859 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4860 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4861 self.vnc_bind_address]
4862 if all_parms.count(None) == len(all_parms):
4863 raise errors.OpPrereqError("No changes submitted")
4864 if self.mem is not None:
4866 self.mem = int(self.mem)
4867 except ValueError, err:
4868 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4869 if self.vcpus is not None:
4871 self.vcpus = int(self.vcpus)
4872 except ValueError, err:
4873 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4874 if self.ip is not None:
4876 if self.ip.lower() == "none":
4879 if not utils.IsValidIP(self.ip):
4880 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4883 self.do_bridge = (self.bridge is not None)
4884 if self.mac is not None:
4885 if self.cfg.IsMacInUse(self.mac):
4886 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4888 if not utils.IsValidMac(self.mac):
4889 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4891 if self.kernel_path is not None:
4892 self.do_kernel_path = True
4893 if self.kernel_path == constants.VALUE_NONE:
4894 raise errors.OpPrereqError("Can't set instance to no kernel")
4896 if self.kernel_path != constants.VALUE_DEFAULT:
4897 if not os.path.isabs(self.kernel_path):
4898 raise errors.OpPrereqError("The kernel path must be an absolute"
4901 self.do_kernel_path = False
4903 if self.initrd_path is not None:
4904 self.do_initrd_path = True
4905 if self.initrd_path not in (constants.VALUE_NONE,
4906 constants.VALUE_DEFAULT):
4907 if not os.path.isabs(self.initrd_path):
4908 raise errors.OpPrereqError("The initrd path must be an absolute"
4911 self.do_initrd_path = False
4913 # boot order verification
4914 if self.hvm_boot_order is not None:
4915 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4916 if len(self.hvm_boot_order.strip("acdn")) != 0:
4917 raise errors.OpPrereqError("invalid boot order specified,"
4918 " must be one or more of [acdn]"
4921 # hvm_cdrom_image_path verification
4922 if self.op.hvm_cdrom_image_path is not None:
4923 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4924 self.op.hvm_cdrom_image_path.lower() == "none"):
4925 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4926 " be an absolute path or None, not %s" %
4927 self.op.hvm_cdrom_image_path)
4928 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4929 self.op.hvm_cdrom_image_path.lower() == "none"):
4930 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4931 " regular file or a symlink pointing to"
4932 " an existing regular file, not %s" %
4933 self.op.hvm_cdrom_image_path)
4935 # vnc_bind_address verification
4936 if self.op.vnc_bind_address is not None:
4937 if not utils.IsValidIP(self.op.vnc_bind_address):
4938 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4939 " like a valid IP address" %
4940 self.op.vnc_bind_address)
4942 instance = self.cfg.GetInstanceInfo(
4943 self.cfg.ExpandInstanceName(self.op.instance_name))
4944 if instance is None:
4945 raise errors.OpPrereqError("No such instance name '%s'" %
4946 self.op.instance_name)
4947 self.op.instance_name = instance.name
4948 self.instance = instance
4951 def Exec(self, feedback_fn):
4952 """Modifies an instance.
4954 All parameters take effect only at the next restart of the instance.
4957 instance = self.instance
4959 instance.memory = self.mem
4960 result.append(("mem", self.mem))
4962 instance.vcpus = self.vcpus
4963 result.append(("vcpus", self.vcpus))
4965 instance.nics[0].ip = self.ip
4966 result.append(("ip", self.ip))
4968 instance.nics[0].bridge = self.bridge
4969 result.append(("bridge", self.bridge))
4971 instance.nics[0].mac = self.mac
4972 result.append(("mac", self.mac))
4973 if self.do_kernel_path:
4974 instance.kernel_path = self.kernel_path
4975 result.append(("kernel_path", self.kernel_path))
4976 if self.do_initrd_path:
4977 instance.initrd_path = self.initrd_path
4978 result.append(("initrd_path", self.initrd_path))
4979 if self.hvm_boot_order:
4980 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4981 instance.hvm_boot_order = None
4983 instance.hvm_boot_order = self.hvm_boot_order
4984 result.append(("hvm_boot_order", self.hvm_boot_order))
4985 if self.hvm_acpi is not None:
4986 instance.hvm_acpi = self.hvm_acpi
4987 result.append(("hvm_acpi", self.hvm_acpi))
4988 if self.hvm_pae is not None:
4989 instance.hvm_pae = self.hvm_pae
4990 result.append(("hvm_pae", self.hvm_pae))
4991 if self.hvm_cdrom_image_path:
4992 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4993 instance.hvm_cdrom_image_path = None
4995 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4996 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4997 if self.vnc_bind_address:
4998 instance.vnc_bind_address = self.vnc_bind_address
4999 result.append(("vnc_bind_address", self.vnc_bind_address))
5001 self.cfg.AddInstance(instance)
5006 class LUQueryExports(NoHooksLU):
5007 """Query the exports list
5012 def CheckPrereq(self):
5013 """Check that the nodelist contains only existing nodes.
5016 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5018 def Exec(self, feedback_fn):
5019 """Compute the list of all the exported system images.
5022 a dictionary with the structure node->(export-list)
5023 where export-list is a list of the instances exported on
5027 return rpc.call_export_list(self.nodes)
5030 class LUExportInstance(LogicalUnit):
5031 """Export an instance to an image in the cluster.
5034 HPATH = "instance-export"
5035 HTYPE = constants.HTYPE_INSTANCE
5036 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5038 def BuildHooksEnv(self):
5041 This will run on the master, primary node and target node.
5045 "EXPORT_NODE": self.op.target_node,
5046 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5048 env.update(_BuildInstanceHookEnvByObject(self.instance))
5049 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5050 self.op.target_node]
5053 def CheckPrereq(self):
5054 """Check prerequisites.
5056 This checks that the instance and node names are valid.
5059 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5060 self.instance = self.cfg.GetInstanceInfo(instance_name)
5061 if self.instance is None:
5062 raise errors.OpPrereqError("Instance '%s' not found" %
5063 self.op.instance_name)
5066 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5067 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5069 if self.dst_node is None:
5070 raise errors.OpPrereqError("Destination node '%s' is unknown." %
5071 self.op.target_node)
5072 self.op.target_node = self.dst_node.name
5074 def Exec(self, feedback_fn):
5075 """Export an instance to an image in the cluster.
5078 instance = self.instance
5079 dst_node = self.dst_node
5080 src_node = instance.primary_node
5081 if self.op.shutdown:
5082 # shutdown the instance, but not the disks
5083 if not rpc.call_instance_shutdown(src_node, instance):
5084 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5085 (instance.name, src_node))
5087 vgname = self.cfg.GetVGName()
5092 for disk in instance.disks:
5093 if disk.iv_name == "sda":
5094 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5095 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5097 if not new_dev_name:
5098 logger.Error("could not snapshot block device %s on node %s" %
5099 (disk.logical_id[1], src_node))
5101 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5102 logical_id=(vgname, new_dev_name),
5103 physical_id=(vgname, new_dev_name),
5104 iv_name=disk.iv_name)
5105 snap_disks.append(new_dev)
5108 if self.op.shutdown and instance.status == "up":
5109 if not rpc.call_instance_start(src_node, instance, None):
5110 _ShutdownInstanceDisks(instance, self.cfg)
5111 raise errors.OpExecError("Could not start instance")
5113 # TODO: check for size
5115 for dev in snap_disks:
5116 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5118 logger.Error("could not export block device %s from node"
5120 (dev.logical_id[1], src_node, dst_node.name))
5121 if not rpc.call_blockdev_remove(src_node, dev):
5122 logger.Error("could not remove snapshot block device %s from"
5123 " node %s" % (dev.logical_id[1], src_node))
5125 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5126 logger.Error("could not finalize export for instance %s on node %s" %
5127 (instance.name, dst_node.name))
5129 nodelist = self.cfg.GetNodeList()
5130 nodelist.remove(dst_node.name)
5132 # on one-node clusters nodelist will be empty after the removal
5133 # if we proceed the backup would be removed because OpQueryExports
5134 # substitutes an empty list with the full cluster node list.
5136 op = opcodes.OpQueryExports(nodes=nodelist)
5137 exportlist = self.proc.ChainOpCode(op)
5138 for node in exportlist:
5139 if instance.name in exportlist[node]:
5140 if not rpc.call_export_remove(node, instance.name):
5141 logger.Error("could not remove older export for instance %s"
5142 " on node %s" % (instance.name, node))
5145 class LURemoveExport(NoHooksLU):
5146 """Remove exports related to the named instance.
5149 _OP_REQP = ["instance_name"]
5151 def CheckPrereq(self):
5152 """Check prerequisites.
5156 def Exec(self, feedback_fn):
5157 """Remove any export.
5160 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5161 # If the instance was not found we'll try with the name that was passed in.
5162 # This will only work if it was an FQDN, though.
5164 if not instance_name:
5166 instance_name = self.op.instance_name
5168 op = opcodes.OpQueryExports(nodes=[])
5169 exportlist = self.proc.ChainOpCode(op)
5171 for node in exportlist:
5172 if instance_name in exportlist[node]:
5174 if not rpc.call_export_remove(node, instance_name):
5175 logger.Error("could not remove export for instance %s"
5176 " on node %s" % (instance_name, node))
5178 if fqdn_warn and not found:
5179 feedback_fn("Export not found. If trying to remove an export belonging"
5180 " to a deleted instance please use its Fully Qualified"
5184 class TagsLU(NoHooksLU):
5187 This is an abstract class which is the parent of all the other tags LUs.
5190 def CheckPrereq(self):
5191 """Check prerequisites.
5194 if self.op.kind == constants.TAG_CLUSTER:
5195 self.target = self.cfg.GetClusterInfo()
5196 elif self.op.kind == constants.TAG_NODE:
5197 name = self.cfg.ExpandNodeName(self.op.name)
5199 raise errors.OpPrereqError("Invalid node name (%s)" %
5202 self.target = self.cfg.GetNodeInfo(name)
5203 elif self.op.kind == constants.TAG_INSTANCE:
5204 name = self.cfg.ExpandInstanceName(self.op.name)
5206 raise errors.OpPrereqError("Invalid instance name (%s)" %
5209 self.target = self.cfg.GetInstanceInfo(name)
5211 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5215 class LUGetTags(TagsLU):
5216 """Returns the tags of a given object.
5219 _OP_REQP = ["kind", "name"]
5221 def Exec(self, feedback_fn):
5222 """Returns the tag list.
5225 return self.target.GetTags()
5228 class LUSearchTags(NoHooksLU):
5229 """Searches the tags for a given pattern.
5232 _OP_REQP = ["pattern"]
5234 def CheckPrereq(self):
5235 """Check prerequisites.
5237 This checks the pattern passed for validity by compiling it.
5241 self.re = re.compile(self.op.pattern)
5242 except re.error, err:
5243 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5244 (self.op.pattern, err))
5246 def Exec(self, feedback_fn):
5247 """Returns the tag list.
5251 tgts = [("/cluster", cfg.GetClusterInfo())]
5252 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5253 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5254 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5255 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5257 for path, target in tgts:
5258 for tag in target.GetTags():
5259 if self.re.search(tag):
5260 results.append((path, tag))
5264 class LUAddTags(TagsLU):
5265 """Sets a tag on a given object.
5268 _OP_REQP = ["kind", "name", "tags"]
5270 def CheckPrereq(self):
5271 """Check prerequisites.
5273 This checks the type and length of the tag name and value.
5276 TagsLU.CheckPrereq(self)
5277 for tag in self.op.tags:
5278 objects.TaggableObject.ValidateTag(tag)
5280 def Exec(self, feedback_fn):
5285 for tag in self.op.tags:
5286 self.target.AddTag(tag)
5287 except errors.TagError, err:
5288 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5290 self.cfg.Update(self.target)
5291 except errors.ConfigurationError:
5292 raise errors.OpRetryError("There has been a modification to the"
5293 " config file and the operation has been"
5294 " aborted. Please retry.")
5297 class LUDelTags(TagsLU):
5298 """Delete a list of tags from a given object.
5301 _OP_REQP = ["kind", "name", "tags"]
5303 def CheckPrereq(self):
5304 """Check prerequisites.
5306 This checks that we have the given tag.
5309 TagsLU.CheckPrereq(self)
5310 for tag in self.op.tags:
5311 objects.TaggableObject.ValidateTag(tag, removal=True)
5312 del_tags = frozenset(self.op.tags)
5313 cur_tags = self.target.GetTags()
5314 if not del_tags <= cur_tags:
5315 diff_tags = del_tags - cur_tags
5316 diff_names = ["'%s'" % tag for tag in diff_tags]
5318 raise errors.OpPrereqError("Tag(s) %s not found" %
5319 (",".join(diff_names)))
5321 def Exec(self, feedback_fn):
5322 """Remove the tag from the object.
5325 for tag in self.op.tags:
5326 self.target.RemoveTag(tag)
5328 self.cfg.Update(self.target)
5329 except errors.ConfigurationError:
5330 raise errors.OpRetryError("There has been a modification to the"
5331 " config file and the operation has been"
5332 " aborted. Please retry.")
5334 class LUTestDelay(NoHooksLU):
5335 """Sleep for a specified amount of time.
5337 This LU sleeps on the master and/or nodes for a specified amoutn of
5341 _OP_REQP = ["duration", "on_master", "on_nodes"]
5343 def CheckPrereq(self):
5344 """Check prerequisites.
5346 This checks that we have a good list of nodes and/or the duration
5351 if self.op.on_nodes:
5352 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5354 def Exec(self, feedback_fn):
5355 """Do the actual sleep.
5358 if self.op.on_master:
5359 if not utils.TestDelay(self.op.duration):
5360 raise errors.OpExecError("Error during master delay test")
5361 if self.op.on_nodes:
5362 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5364 raise errors.OpExecError("Complete failure from rpc call")
5365 for node, node_result in result.items():
5367 raise errors.OpExecError("Failure during rpc call to node %s,"
5368 " result: %s" % (node, node_result))
5371 class IAllocator(object):
5372 """IAllocator framework.
5374 An IAllocator instance has three sets of attributes:
5375 - cfg/sstore that are needed to query the cluster
5376 - input data (all members of the _KEYS class attribute are required)
5377 - four buffer attributes (in|out_data|text), that represent the
5378 input (to the external script) in text and data structure format,
5379 and the output from it, again in two formats
5380 - the result variables from the script (success, info, nodes) for
5385 "mem_size", "disks", "disk_template",
5386 "os", "tags", "nics", "vcpus",
5392 def __init__(self, cfg, sstore, mode, name, **kwargs):
5394 self.sstore = sstore
5395 # init buffer variables
5396 self.in_text = self.out_text = self.in_data = self.out_data = None
5397 # init all input fields so that pylint is happy
5400 self.mem_size = self.disks = self.disk_template = None
5401 self.os = self.tags = self.nics = self.vcpus = None
5402 self.relocate_from = None
5404 self.required_nodes = None
5405 # init result fields
5406 self.success = self.info = self.nodes = None
5407 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5408 keyset = self._ALLO_KEYS
5409 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5410 keyset = self._RELO_KEYS
5412 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5413 " IAllocator" % self.mode)
5415 if key not in keyset:
5416 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5417 " IAllocator" % key)
5418 setattr(self, key, kwargs[key])
5420 if key not in kwargs:
5421 raise errors.ProgrammerError("Missing input parameter '%s' to"
5422 " IAllocator" % key)
5423 self._BuildInputData()
5425 def _ComputeClusterData(self):
5426 """Compute the generic allocator input data.
5428 This is the data that is independent of the actual operation.
5435 "cluster_name": self.sstore.GetClusterName(),
5436 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5437 "hypervisor_type": self.sstore.GetHypervisorType(),
5438 # we don't have job IDs
5441 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5445 node_list = cfg.GetNodeList()
5446 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5447 for nname in node_list:
5448 ninfo = cfg.GetNodeInfo(nname)
5449 if nname not in node_data or not isinstance(node_data[nname], dict):
5450 raise errors.OpExecError("Can't get data for node %s" % nname)
5451 remote_info = node_data[nname]
5452 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5453 'vg_size', 'vg_free', 'cpu_total']:
5454 if attr not in remote_info:
5455 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5458 remote_info[attr] = int(remote_info[attr])
5459 except ValueError, err:
5460 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5461 " %s" % (nname, attr, str(err)))
5462 # compute memory used by primary instances
5463 i_p_mem = i_p_up_mem = 0
5464 for iinfo in i_list:
5465 if iinfo.primary_node == nname:
5466 i_p_mem += iinfo.memory
5467 if iinfo.status == "up":
5468 i_p_up_mem += iinfo.memory
5470 # compute memory used by instances
5472 "tags": list(ninfo.GetTags()),
5473 "total_memory": remote_info['memory_total'],
5474 "reserved_memory": remote_info['memory_dom0'],
5475 "free_memory": remote_info['memory_free'],
5476 "i_pri_memory": i_p_mem,
5477 "i_pri_up_memory": i_p_up_mem,
5478 "total_disk": remote_info['vg_size'],
5479 "free_disk": remote_info['vg_free'],
5480 "primary_ip": ninfo.primary_ip,
5481 "secondary_ip": ninfo.secondary_ip,
5482 "total_cpus": remote_info['cpu_total'],
5484 node_results[nname] = pnr
5485 data["nodes"] = node_results
5489 for iinfo in i_list:
5490 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5491 for n in iinfo.nics]
5493 "tags": list(iinfo.GetTags()),
5494 "should_run": iinfo.status == "up",
5495 "vcpus": iinfo.vcpus,
5496 "memory": iinfo.memory,
5498 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5500 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5501 "disk_template": iinfo.disk_template,
5503 instance_data[iinfo.name] = pir
5505 data["instances"] = instance_data
5509 def _AddNewInstance(self):
5510 """Add new instance data to allocator structure.
5512 This in combination with _AllocatorGetClusterData will create the
5513 correct structure needed as input for the allocator.
5515 The checks for the completeness of the opcode must have already been
5520 if len(self.disks) != 2:
5521 raise errors.OpExecError("Only two-disk configurations supported")
5523 disk_space = _ComputeDiskSize(self.disk_template,
5524 self.disks[0]["size"], self.disks[1]["size"])
5526 if self.disk_template in constants.DTS_NET_MIRROR:
5527 self.required_nodes = 2
5529 self.required_nodes = 1
5533 "disk_template": self.disk_template,
5536 "vcpus": self.vcpus,
5537 "memory": self.mem_size,
5538 "disks": self.disks,
5539 "disk_space_total": disk_space,
5541 "required_nodes": self.required_nodes,
5543 data["request"] = request
5545 def _AddRelocateInstance(self):
5546 """Add relocate instance data to allocator structure.
5548 This in combination with _IAllocatorGetClusterData will create the
5549 correct structure needed as input for the allocator.
5551 The checks for the completeness of the opcode must have already been
5555 instance = self.cfg.GetInstanceInfo(self.name)
5556 if instance is None:
5557 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5558 " IAllocator" % self.name)
5560 if instance.disk_template not in constants.DTS_NET_MIRROR:
5561 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5563 if len(instance.secondary_nodes) != 1:
5564 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5566 self.required_nodes = 1
5568 disk_space = _ComputeDiskSize(instance.disk_template,
5569 instance.disks[0].size,
5570 instance.disks[1].size)
5575 "disk_space_total": disk_space,
5576 "required_nodes": self.required_nodes,
5577 "relocate_from": self.relocate_from,
5579 self.in_data["request"] = request
5581 def _BuildInputData(self):
5582 """Build input data structures.
5585 self._ComputeClusterData()
5587 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5588 self._AddNewInstance()
5590 self._AddRelocateInstance()
5592 self.in_text = serializer.Dump(self.in_data)
5594 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5595 """Run an instance allocator and return the results.
5600 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5602 if not isinstance(result, tuple) or len(result) != 4:
5603 raise errors.OpExecError("Invalid result from master iallocator runner")
5605 rcode, stdout, stderr, fail = result
5607 if rcode == constants.IARUN_NOTFOUND:
5608 raise errors.OpExecError("Can't find allocator '%s'" % name)
5609 elif rcode == constants.IARUN_FAILURE:
5610 raise errors.OpExecError("Instance allocator call failed: %s,"
5612 (fail, stdout+stderr))
5613 self.out_text = stdout
5615 self._ValidateResult()
5617 def _ValidateResult(self):
5618 """Process the allocator results.
5620 This will process and if successful save the result in
5621 self.out_data and the other parameters.
5625 rdict = serializer.Load(self.out_text)
5626 except Exception, err:
5627 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5629 if not isinstance(rdict, dict):
5630 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5632 for key in "success", "info", "nodes":
5633 if key not in rdict:
5634 raise errors.OpExecError("Can't parse iallocator results:"
5635 " missing key '%s'" % key)
5636 setattr(self, key, rdict[key])
5638 if not isinstance(rdict["nodes"], list):
5639 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5641 self.out_data = rdict
5644 class LUTestAllocator(NoHooksLU):
5645 """Run allocator tests.
5647 This LU runs the allocator tests
5650 _OP_REQP = ["direction", "mode", "name"]
5652 def CheckPrereq(self):
5653 """Check prerequisites.
5655 This checks the opcode parameters depending on the director and mode test.
5658 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5659 for attr in ["name", "mem_size", "disks", "disk_template",
5660 "os", "tags", "nics", "vcpus"]:
5661 if not hasattr(self.op, attr):
5662 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5664 iname = self.cfg.ExpandInstanceName(self.op.name)
5665 if iname is not None:
5666 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5668 if not isinstance(self.op.nics, list):
5669 raise errors.OpPrereqError("Invalid parameter 'nics'")
5670 for row in self.op.nics:
5671 if (not isinstance(row, dict) or
5674 "bridge" not in row):
5675 raise errors.OpPrereqError("Invalid contents of the"
5676 " 'nics' parameter")
5677 if not isinstance(self.op.disks, list):
5678 raise errors.OpPrereqError("Invalid parameter 'disks'")
5679 if len(self.op.disks) != 2:
5680 raise errors.OpPrereqError("Only two-disk configurations supported")
5681 for row in self.op.disks:
5682 if (not isinstance(row, dict) or
5683 "size" not in row or
5684 not isinstance(row["size"], int) or
5685 "mode" not in row or
5686 row["mode"] not in ['r', 'w']):
5687 raise errors.OpPrereqError("Invalid contents of the"
5688 " 'disks' parameter")
5689 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5690 if not hasattr(self.op, "name"):
5691 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5692 fname = self.cfg.ExpandInstanceName(self.op.name)
5694 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5696 self.op.name = fname
5697 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5699 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5702 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5703 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5704 raise errors.OpPrereqError("Missing allocator name")
5705 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5706 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5709 def Exec(self, feedback_fn):
5710 """Run the allocator test.
5713 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5714 ial = IAllocator(self.cfg, self.sstore,
5717 mem_size=self.op.mem_size,
5718 disks=self.op.disks,
5719 disk_template=self.op.disk_template,
5723 vcpus=self.op.vcpus,
5726 ial = IAllocator(self.cfg, self.sstore,
5729 relocate_from=list(self.relocate_from),
5732 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5733 result = ial.in_text
5735 ial.Run(self.op.allocator, validate=False)
5736 result = ial.out_text