4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 No nodes should be returned as an empty list (and not None).
138 Note that if the HPATH for a LU class is None, this function will
142 raise NotImplementedError
144 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145 """Notify the LU about the results of its hooks.
147 This method is called every time a hooks phase is executed, and notifies
148 the Logical Unit about the hooks' result. The LU can then use it to alter
149 its result based on the hooks. By default the method does nothing and the
150 previous result is passed back unchanged but any LU can define it if it
151 wants to use the local cluster hook-scripts somehow.
154 phase: the hooks phase that has just been run
155 hooks_results: the results of the multi-node hooks rpc call
156 feedback_fn: function to send feedback back to the caller
157 lu_result: the previous result this LU had, or None in the PRE phase.
163 class NoHooksLU(LogicalUnit):
164 """Simple LU which runs no hooks.
166 This LU is intended as a parent for other LogicalUnits which will
167 run no hooks, in order to reduce duplicate code.
174 def _AddHostToEtcHosts(hostname):
175 """Wrapper around utils.SetEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182 def _RemoveHostFromEtcHosts(hostname):
183 """Wrapper around utils.RemoveEtcHostsEntry.
186 hi = utils.HostInfo(name=hostname)
187 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191 def _GetWantedNodes(lu, nodes):
192 """Returns list of checked and expanded node names.
195 nodes: List of nodes (strings) or None for all
198 if not isinstance(nodes, list):
199 raise errors.OpPrereqError("Invalid argument type 'nodes'")
205 node = lu.cfg.ExpandNodeName(name)
207 raise errors.OpPrereqError("No such node name '%s'" % name)
211 wanted = lu.cfg.GetNodeList()
212 return utils.NiceSort(wanted)
215 def _GetWantedInstances(lu, instances):
216 """Returns list of checked and expanded instance names.
219 instances: List of instances (strings) or None for all
222 if not isinstance(instances, list):
223 raise errors.OpPrereqError("Invalid argument type 'instances'")
228 for name in instances:
229 instance = lu.cfg.ExpandInstanceName(name)
231 raise errors.OpPrereqError("No such instance name '%s'" % name)
232 wanted.append(instance)
235 wanted = lu.cfg.GetInstanceList()
236 return utils.NiceSort(wanted)
239 def _CheckOutputFields(static, dynamic, selected):
240 """Checks whether all selected fields are valid.
243 static: Static fields
244 dynamic: Dynamic fields
247 static_fields = frozenset(static)
248 dynamic_fields = frozenset(dynamic)
250 all_fields = static_fields | dynamic_fields
252 if not all_fields.issuperset(selected):
253 raise errors.OpPrereqError("Unknown output fields selected: %s"
254 % ",".join(frozenset(selected).
255 difference(all_fields)))
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259 memory, vcpus, nics):
260 """Builds instance related env variables for hooks from single variables.
263 secondary_nodes: List of secondary nodes as strings
267 "INSTANCE_NAME": name,
268 "INSTANCE_PRIMARY": primary_node,
269 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270 "INSTANCE_OS_TYPE": os_type,
271 "INSTANCE_STATUS": status,
272 "INSTANCE_MEMORY": memory,
273 "INSTANCE_VCPUS": vcpus,
277 nic_count = len(nics)
278 for idx, (ip, bridge, mac) in enumerate(nics):
281 env["INSTANCE_NIC%d_IP" % idx] = ip
282 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
287 env["INSTANCE_NIC_COUNT"] = nic_count
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293 """Builds instance related env variables for hooks from an object.
296 instance: objects.Instance object of instance
297 override: dict of values to override
300 'name': instance.name,
301 'primary_node': instance.primary_node,
302 'secondary_nodes': instance.secondary_nodes,
303 'os_type': instance.os,
304 'status': instance.os,
305 'memory': instance.memory,
306 'vcpus': instance.vcpus,
307 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310 args.update(override)
311 return _BuildInstanceHookEnv(**args)
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315 """Ensure a node has a correct known_hosts entry.
318 fullnode - Fully qualified domain name of host. (str)
319 ip - IPv4 address of host (str)
320 pubkey - the public key of the cluster
323 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
335 logger.Debug('read %s' % (repr(rawline),))
337 parts = rawline.rstrip('\r\n').split()
339 # Ignore unwanted lines
340 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341 fields = parts[0].split(',')
346 for spec in [ ip, fullnode ]:
347 if spec not in fields:
352 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353 if haveall and key == pubkey:
355 save_lines.append(rawline)
356 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359 if havesome and (not haveall or key != pubkey):
361 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364 save_lines.append(rawline)
367 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371 save_lines = save_lines + add_lines
373 # Write a new file and replace old.
374 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
376 newfile = os.fdopen(fd, 'w')
378 newfile.write(''.join(save_lines))
381 logger.Debug("Wrote new known_hosts.")
382 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385 # Simply appending a new line will do the trick.
387 for add in add_lines:
393 def _HasValidVG(vglist, vgname):
394 """Checks if the volume group list is valid.
396 A non-None return value means there's an error, and the return value
397 is the error message.
400 vgsize = vglist.get(vgname, None)
402 return "volume group '%s' missing" % vgname
404 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
409 def _InitSSHSetup(node):
410 """Setup the SSH configuration for the cluster.
413 This generates a dsa keypair for root, adds the pub key to the
414 permitted hosts and adds the hostkey to its own known hosts.
417 node: the name of this host as a fqdn
420 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
422 for name in priv_key, pub_key:
423 if os.path.exists(name):
424 utils.CreateBackup(name)
425 utils.RemoveFile(name)
427 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
431 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434 f = open(pub_key, 'r')
436 utils.AddAuthorizedKey(auth_keys, f.read(8192))
441 def _InitGanetiServerSetup(ss):
442 """Setup the necessary configuration for the initial node daemon.
444 This creates the nodepass file containing the shared password for
445 the cluster and also generates the SSL certificate.
448 # Create pseudo random password
449 randpass = sha.new(os.urandom(64)).hexdigest()
450 # and write it into sstore
451 ss.SetKey(ss.SS_NODED_PASS, randpass)
453 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454 "-days", str(365*5), "-nodes", "-x509",
455 "-keyout", constants.SSL_CERT_FILE,
456 "-out", constants.SSL_CERT_FILE, "-batch"])
458 raise errors.OpExecError("could not generate server ssl cert, command"
459 " %s had exitcode %s and error message %s" %
460 (result.cmd, result.exit_code, result.output))
462 os.chmod(constants.SSL_CERT_FILE, 0400)
464 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467 raise errors.OpExecError("Could not start the node daemon, command %s"
468 " had exitcode %s and error %s" %
469 (result.cmd, result.exit_code, result.output))
472 def _CheckInstanceBridgesExist(instance):
473 """Check that the brigdes needed by an instance exist.
476 # check bridges existance
477 brlist = [nic.bridge for nic in instance.nics]
478 if not rpc.call_bridges_exist(instance.primary_node, brlist):
479 raise errors.OpPrereqError("one or more target bridges %s does not"
480 " exist on destination node '%s'" %
481 (brlist, instance.primary_node))
484 class LUInitCluster(LogicalUnit):
485 """Initialise the cluster.
488 HPATH = "cluster-init"
489 HTYPE = constants.HTYPE_CLUSTER
490 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491 "def_bridge", "master_netdev"]
494 def BuildHooksEnv(self):
497 Notes: Since we don't require a cluster, we must manually add
498 ourselves in the post-run node list.
501 env = {"OP_TARGET": self.op.cluster_name}
502 return env, [], [self.hostname.name]
504 def CheckPrereq(self):
505 """Verify that the passed name is a valid one.
508 if config.ConfigWriter.IsCluster():
509 raise errors.OpPrereqError("Cluster is already initialised")
511 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512 if not os.path.exists(constants.VNC_PASSWORD_FILE):
513 raise errors.OpPrereqError("Please prepare the cluster VNC"
515 constants.VNC_PASSWORD_FILE)
517 self.hostname = hostname = utils.HostInfo()
519 if hostname.ip.startswith("127."):
520 raise errors.OpPrereqError("This host's IP resolves to the private"
521 " range (%s). Please fix DNS or %s." %
522 (hostname.ip, constants.ETC_HOSTS))
524 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525 source=constants.LOCALHOST_IP_ADDRESS):
526 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527 " to %s,\nbut this ip address does not"
528 " belong to this host."
529 " Aborting." % hostname.ip)
531 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
533 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
535 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
537 secondary_ip = getattr(self.op, "secondary_ip", None)
538 if secondary_ip and not utils.IsValidIP(secondary_ip):
539 raise errors.OpPrereqError("Invalid secondary ip given")
541 secondary_ip != hostname.ip and
542 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543 source=constants.LOCALHOST_IP_ADDRESS))):
544 raise errors.OpPrereqError("You gave %s as secondary IP,"
545 " but it does not belong to this host." %
547 self.secondary_ip = secondary_ip
549 # checks presence of the volume group given
550 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553 raise errors.OpPrereqError("Error: %s" % vgstatus)
555 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
557 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560 if self.op.hypervisor_type not in constants.HYPER_TYPES:
561 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562 self.op.hypervisor_type)
564 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
566 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567 (self.op.master_netdev,
568 result.output.strip()))
570 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572 raise errors.OpPrereqError("Init.d script '%s' missing or not"
573 " executable." % constants.NODE_INITD_SCRIPT)
575 def Exec(self, feedback_fn):
576 """Initialize the cluster.
579 clustername = self.clustername
580 hostname = self.hostname
582 # set up the simple store
583 self.sstore = ss = ssconf.SimpleStore()
584 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
590 # set up the inter-node password and certificate
591 _InitGanetiServerSetup(ss)
593 # start the master ip
594 rpc.call_node_start_master(hostname.name)
596 # set up ssh config and /etc/hosts
597 f = open(constants.SSH_HOST_RSA_PUB, 'r')
602 sshkey = sshline.split(" ")[1]
604 _AddHostToEtcHosts(hostname.name)
606 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
608 _InitSSHSetup(hostname.name)
610 # init of cluster config file
611 self.cfg = cfgw = config.ConfigWriter()
612 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613 sshkey, self.op.mac_prefix,
614 self.op.vg_name, self.op.def_bridge)
617 class LUDestroyCluster(NoHooksLU):
618 """Logical unit for destroying the cluster.
623 def CheckPrereq(self):
624 """Check prerequisites.
626 This checks whether the cluster is empty.
628 Any errors are signalled by raising errors.OpPrereqError.
631 master = self.sstore.GetMasterNode()
633 nodelist = self.cfg.GetNodeList()
634 if len(nodelist) != 1 or nodelist[0] != master:
635 raise errors.OpPrereqError("There are still %d node(s) in"
636 " this cluster." % (len(nodelist) - 1))
637 instancelist = self.cfg.GetInstanceList()
639 raise errors.OpPrereqError("There are still %d instance(s) in"
640 " this cluster." % len(instancelist))
642 def Exec(self, feedback_fn):
643 """Destroys the cluster.
646 master = self.sstore.GetMasterNode()
647 if not rpc.call_node_stop_master(master):
648 raise errors.OpExecError("Could not disable the master role")
649 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650 utils.CreateBackup(priv_key)
651 utils.CreateBackup(pub_key)
652 rpc.call_node_leave_cluster(master)
655 class LUVerifyCluster(LogicalUnit):
656 """Verifies the cluster status.
659 HPATH = "cluster-verify"
660 HTYPE = constants.HTYPE_CLUSTER
661 _OP_REQP = ["skip_checks"]
663 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664 remote_version, feedback_fn):
665 """Run multiple tests against a node.
668 - compares ganeti version
669 - checks vg existance and size > 20G
670 - checks config file checksum
671 - checks ssh to other nodes
674 node: name of the node to check
675 file_list: required list of files
676 local_cksum: dictionary of local files and their checksums
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 if not remote_version:
682 feedback_fn(" - ERROR: connection to %s failed" % (node))
685 if local_version != remote_version:
686 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
687 (local_version, node, remote_version))
690 # checks vg existance and size > 20G
694 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
698 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
700 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
703 # checks config file checksum
706 if 'filelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned file checksum data")
710 remote_cksum = node_result['filelist']
711 for file_name in file_list:
712 if file_name not in remote_cksum:
714 feedback_fn(" - ERROR: file '%s' missing" % file_name)
715 elif remote_cksum[file_name] != local_cksum[file_name]:
717 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
719 if 'nodelist' not in node_result:
721 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
723 if node_result['nodelist']:
725 for node in node_result['nodelist']:
726 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
727 (node, node_result['nodelist'][node]))
728 if 'node-net-test' not in node_result:
730 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
732 if node_result['node-net-test']:
734 nlist = utils.NiceSort(node_result['node-net-test'].keys())
736 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
737 (node, node_result['node-net-test'][node]))
739 hyp_result = node_result.get('hypervisor', None)
740 if hyp_result is not None:
741 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
744 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745 node_instance, feedback_fn):
746 """Verify an instance.
748 This function checks to see if the required block devices are
749 available on the instance's node.
754 node_current = instanceconfig.primary_node
757 instanceconfig.MapLVsByNode(node_vol_should)
759 for node in node_vol_should:
760 for volume in node_vol_should[node]:
761 if node not in node_vol_is or volume not in node_vol_is[node]:
762 feedback_fn(" - ERROR: volume %s missing on node %s" %
766 if not instanceconfig.status == 'down':
767 if (node_current not in node_instance or
768 not instance in node_instance[node_current]):
769 feedback_fn(" - ERROR: instance %s not running on node %s" %
770 (instance, node_current))
773 for node in node_instance:
774 if (not node == node_current):
775 if instance in node_instance[node]:
776 feedback_fn(" - ERROR: instance %s should not run on node %s" %
782 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783 """Verify if there are any unknown volumes in the cluster.
785 The .os, .swap and backup volumes are ignored. All other volumes are
791 for node in node_vol_is:
792 for volume in node_vol_is[node]:
793 if node not in node_vol_should or volume not in node_vol_should[node]:
794 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
799 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800 """Verify the list of running instances.
802 This checks what instances are running but unknown to the cluster.
806 for node in node_instance:
807 for runninginstance in node_instance[node]:
808 if runninginstance not in instancelist:
809 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
810 (runninginstance, node))
814 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815 """Verify N+1 Memory Resilience.
817 Check that if one single node dies we can still start all the instances it
823 for node, nodeinfo in node_info.iteritems():
824 # This code checks that every node which is now listed as secondary has
825 # enough memory to host all instances it is supposed to should a single
826 # other node in the cluster fail.
827 # FIXME: not ready for failover to an arbitrary node
828 # FIXME: does not support file-backed instances
829 # WARNING: we currently take into account down instances as well as up
830 # ones, considering that even if they're down someone might want to start
831 # them even in the event of a node failure.
832 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
834 for instance in instances:
835 needed_mem += instance_cfg[instance].memory
836 if nodeinfo['mfree'] < needed_mem:
837 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
838 " failovers should node %s fail" % (node, prinode))
842 def CheckPrereq(self):
843 """Check prerequisites.
845 Transform the list of checks we're going to skip into a set and check that
846 all its members are valid.
849 self.skip_set = frozenset(self.op.skip_checks)
850 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851 raise errors.OpPrereqError("Invalid checks to be skipped specified")
853 def BuildHooksEnv(self):
856 Cluster-Verify hooks just rone in the post phase and their failure makes
857 the output be logged in the verify output and the verification to fail.
860 all_nodes = self.cfg.GetNodeList()
861 tags = self.cfg.GetClusterInfo().GetTags()
862 # TODO: populate the environment with useful information for verify hooks
864 "CLUSTER_TAGS": " ".join(tags),
866 return env, [], all_nodes
868 def Exec(self, feedback_fn):
869 """Verify integrity of cluster, performing various test on nodes.
873 feedback_fn("* Verifying global settings")
874 for msg in self.cfg.VerifyConfig():
875 feedback_fn(" - ERROR: %s" % msg)
877 vg_name = self.cfg.GetVGName()
878 nodelist = utils.NiceSort(self.cfg.GetNodeList())
879 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881 i_non_redundant = [] # Non redundant instances
887 # FIXME: verify OS list
889 file_names = list(self.sstore.GetFileList())
890 file_names.append(constants.SSL_CERT_FILE)
891 file_names.append(constants.CLUSTER_CONF_FILE)
892 local_checksums = utils.FingerprintFiles(file_names)
894 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896 all_instanceinfo = rpc.call_instance_list(nodelist)
897 all_vglist = rpc.call_vg_list(nodelist)
898 node_verify_param = {
899 'filelist': file_names,
900 'nodelist': nodelist,
902 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903 for node in nodeinfo]
905 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906 all_rversion = rpc.call_version(nodelist)
907 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
909 incomplete_nodeinfo = False
911 for node in nodelist:
912 feedback_fn("* Verifying node %s" % node)
913 result = self._VerifyNode(node, file_names, local_checksums,
914 all_vglist[node], all_nvinfo[node],
915 all_rversion[node], feedback_fn)
919 volumeinfo = all_volumeinfo[node]
921 if isinstance(volumeinfo, basestring):
922 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
923 (node, volumeinfo[-400:].encode('string_escape')))
925 node_volume[node] = {}
926 elif not isinstance(volumeinfo, dict):
927 feedback_fn(" - ERROR: connection to %s failed" % (node,))
929 incomplete_nodeinfo = True
932 node_volume[node] = volumeinfo
935 nodeinstance = all_instanceinfo[node]
936 if type(nodeinstance) != list:
937 feedback_fn(" - ERROR: connection to %s failed" % (node,))
939 incomplete_nodeinfo = True
942 node_instance[node] = nodeinstance
945 nodeinfo = all_ninfo[node]
946 if not isinstance(nodeinfo, dict):
947 feedback_fn(" - ERROR: connection to %s failed" % (node,))
949 incomplete_nodeinfo = True
954 "mfree": int(nodeinfo['memory_free']),
955 "dfree": int(nodeinfo['vg_free']),
958 # dictionary holding all instances this node is secondary for,
959 # grouped by their primary node. Each key is a cluster node, and each
960 # value is a list of instances which have the key as primary and the
961 # current node as secondary. this is handy to calculate N+1 memory
962 # availability if you can only failover from a primary to its
964 "sinst-by-pnode": {},
966 except (ValueError, TypeError):
967 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
969 incomplete_nodeinfo = True
974 for instance in instancelist:
975 feedback_fn("* Verifying instance %s" % instance)
976 inst_config = self.cfg.GetInstanceInfo(instance)
977 result = self._VerifyInstance(instance, inst_config, node_volume,
978 node_instance, feedback_fn)
981 inst_config.MapLVsByNode(node_vol_should)
983 instance_cfg[instance] = inst_config
985 pnode = inst_config.primary_node
986 if pnode in node_info:
987 node_info[pnode]['pinst'].append(instance)
989 feedback_fn(" - ERROR: instance %s, connection to primary node"
990 " %s failed" % (instance, pnode))
993 # If the instance is non-redundant we cannot survive losing its primary
994 # node, so we are not N+1 compliant. On the other hand we have no disk
995 # templates with more than one secondary so that situation is not well
997 # FIXME: does not support file-backed instances
998 if len(inst_config.secondary_nodes) == 0:
999 i_non_redundant.append(instance)
1000 elif len(inst_config.secondary_nodes) > 1:
1001 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1004 for snode in inst_config.secondary_nodes:
1005 if snode in node_info:
1006 node_info[snode]['sinst'].append(instance)
1007 if pnode not in node_info[snode]['sinst-by-pnode']:
1008 node_info[snode]['sinst-by-pnode'][pnode] = []
1009 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1011 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1012 " %s failed" % (instance, snode))
1014 feedback_fn("* Verifying orphan volumes")
1015 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1019 feedback_fn("* Verifying remaining instances")
1020 result = self._VerifyOrphanInstances(instancelist, node_instance,
1024 if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1025 not incomplete_nodeinfo):
1026 feedback_fn("* Verifying N+1 Memory redundancy")
1027 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1030 feedback_fn("* Other Notes")
1032 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1033 % len(i_non_redundant))
1037 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1038 """Analize the post-hooks' result, handle it, and send some
1039 nicely-formatted feedback back to the user.
1042 phase: the hooks phase that has just been run
1043 hooks_results: the results of the multi-node hooks rpc call
1044 feedback_fn: function to send feedback back to the caller
1045 lu_result: previous Exec result
1048 # We only really run POST phase hooks, and are only interested in their results
1049 if phase == constants.HOOKS_PHASE_POST:
1050 # Used to change hooks' output to proper indentation
1051 indent_re = re.compile('^', re.M)
1052 feedback_fn("* Hooks Results")
1053 if not hooks_results:
1054 feedback_fn(" - ERROR: general communication failure")
1057 for node_name in hooks_results:
1058 show_node_header = True
1059 res = hooks_results[node_name]
1060 if res is False or not isinstance(res, list):
1061 feedback_fn(" Communication failure")
1064 for script, hkr, output in res:
1065 if hkr == constants.HKR_FAIL:
1066 # The node header is only shown once, if there are
1067 # failing hooks on that node
1068 if show_node_header:
1069 feedback_fn(" Node %s:" % node_name)
1070 show_node_header = False
1071 feedback_fn(" ERROR: Script %s failed, output:" % script)
1072 output = indent_re.sub(' ', output)
1073 feedback_fn("%s" % output)
1079 class LUVerifyDisks(NoHooksLU):
1080 """Verifies the cluster disks status.
1085 def CheckPrereq(self):
1086 """Check prerequisites.
1088 This has no prerequisites.
1093 def Exec(self, feedback_fn):
1094 """Verify integrity of cluster disks.
1097 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1099 vg_name = self.cfg.GetVGName()
1100 nodes = utils.NiceSort(self.cfg.GetNodeList())
1101 instances = [self.cfg.GetInstanceInfo(name)
1102 for name in self.cfg.GetInstanceList()]
1105 for inst in instances:
1107 if (inst.status != "up" or
1108 inst.disk_template not in constants.DTS_NET_MIRROR):
1110 inst.MapLVsByNode(inst_lvs)
1111 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1112 for node, vol_list in inst_lvs.iteritems():
1113 for vol in vol_list:
1114 nv_dict[(node, vol)] = inst
1119 node_lvs = rpc.call_volume_list(nodes, vg_name)
1124 lvs = node_lvs[node]
1126 if isinstance(lvs, basestring):
1127 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1128 res_nlvm[node] = lvs
1129 elif not isinstance(lvs, dict):
1130 logger.Info("connection to node %s failed or invalid data returned" %
1132 res_nodes.append(node)
1135 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1136 inst = nv_dict.pop((node, lv_name), None)
1137 if (not lv_online and inst is not None
1138 and inst.name not in res_instances):
1139 res_instances.append(inst.name)
1141 # any leftover items in nv_dict are missing LVs, let's arrange the
1143 for key, inst in nv_dict.iteritems():
1144 if inst.name not in res_missing:
1145 res_missing[inst.name] = []
1146 res_missing[inst.name].append(key)
1151 class LURenameCluster(LogicalUnit):
1152 """Rename the cluster.
1155 HPATH = "cluster-rename"
1156 HTYPE = constants.HTYPE_CLUSTER
1159 def BuildHooksEnv(self):
1164 "OP_TARGET": self.sstore.GetClusterName(),
1165 "NEW_NAME": self.op.name,
1167 mn = self.sstore.GetMasterNode()
1168 return env, [mn], [mn]
1170 def CheckPrereq(self):
1171 """Verify that the passed name is a valid one.
1174 hostname = utils.HostInfo(self.op.name)
1176 new_name = hostname.name
1177 self.ip = new_ip = hostname.ip
1178 old_name = self.sstore.GetClusterName()
1179 old_ip = self.sstore.GetMasterIP()
1180 if new_name == old_name and new_ip == old_ip:
1181 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1182 " cluster has changed")
1183 if new_ip != old_ip:
1184 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1185 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1186 " reachable on the network. Aborting." %
1189 self.op.name = new_name
1191 def Exec(self, feedback_fn):
1192 """Rename the cluster.
1195 clustername = self.op.name
1199 # shutdown the master IP
1200 master = ss.GetMasterNode()
1201 if not rpc.call_node_stop_master(master):
1202 raise errors.OpExecError("Could not disable the master role")
1206 ss.SetKey(ss.SS_MASTER_IP, ip)
1207 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1209 # Distribute updated ss config to all nodes
1210 myself = self.cfg.GetNodeInfo(master)
1211 dist_nodes = self.cfg.GetNodeList()
1212 if myself.name in dist_nodes:
1213 dist_nodes.remove(myself.name)
1215 logger.Debug("Copying updated ssconf data to all nodes")
1216 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1217 fname = ss.KeyToFilename(keyname)
1218 result = rpc.call_upload_file(dist_nodes, fname)
1219 for to_node in dist_nodes:
1220 if not result[to_node]:
1221 logger.Error("copy of file %s to node %s failed" %
1224 if not rpc.call_node_start_master(master):
1225 logger.Error("Could not re-enable the master role on the master,"
1226 " please restart manually.")
1229 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1230 """Sleep and poll for an instance's disk to sync.
1233 if not instance.disks:
1237 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1239 node = instance.primary_node
1241 for dev in instance.disks:
1242 cfgw.SetDiskID(dev, node)
1248 cumul_degraded = False
1249 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1251 proc.LogWarning("Can't get any data from node %s" % node)
1254 raise errors.RemoteError("Can't contact node %s for mirror data,"
1255 " aborting." % node)
1259 for i in range(len(rstats)):
1262 proc.LogWarning("Can't compute data for node %s/%s" %
1263 (node, instance.disks[i].iv_name))
1265 # we ignore the ldisk parameter
1266 perc_done, est_time, is_degraded, _ = mstat
1267 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268 if perc_done is not None:
1270 if est_time is not None:
1271 rem_time = "%d estimated seconds remaining" % est_time
1274 rem_time = "no time estimate"
1275 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276 (instance.disks[i].iv_name, perc_done, rem_time))
1283 time.sleep(min(60, max_time))
1289 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1290 return not cumul_degraded
1293 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1294 """Check that mirrors are not degraded.
1296 The ldisk parameter, if True, will change the test from the
1297 is_degraded attribute (which represents overall non-ok status for
1298 the device(s)) to the ldisk (representing the local storage status).
1301 cfgw.SetDiskID(dev, node)
1308 if on_primary or dev.AssembleOnSecondary():
1309 rstats = rpc.call_blockdev_find(node, dev)
1311 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1314 result = result and (not rstats[idx])
1316 for child in dev.children:
1317 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1322 class LUDiagnoseOS(NoHooksLU):
1323 """Logical unit for OS diagnose/query.
1326 _OP_REQP = ["output_fields", "names"]
1328 def CheckPrereq(self):
1329 """Check prerequisites.
1331 This always succeeds, since this is a pure query LU.
1335 raise errors.OpPrereqError("Selective OS query not supported")
1337 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1338 _CheckOutputFields(static=[],
1339 dynamic=self.dynamic_fields,
1340 selected=self.op.output_fields)
1343 def _DiagnoseByOS(node_list, rlist):
1344 """Remaps a per-node return list into an a per-os per-node dictionary
1347 node_list: a list with the names of all nodes
1348 rlist: a map with node names as keys and OS objects as values
1351 map: a map with osnames as keys and as value another map, with
1353 keys and list of OS objects as values
1354 e.g. {"debian-etch": {"node1": [<object>,...],
1355 "node2": [<object>,]}
1360 for node_name, nr in rlist.iteritems():
1364 if os.name not in all_os:
1365 # build a list of nodes for this os containing empty lists
1366 # for each node in node_list
1367 all_os[os.name] = {}
1368 for nname in node_list:
1369 all_os[os.name][nname] = []
1370 all_os[os.name][node_name].append(os)
1373 def Exec(self, feedback_fn):
1374 """Compute the list of OSes.
1377 node_list = self.cfg.GetNodeList()
1378 node_data = rpc.call_os_diagnose(node_list)
1379 if node_data == False:
1380 raise errors.OpExecError("Can't gather the list of OSes")
1381 pol = self._DiagnoseByOS(node_list, node_data)
1383 for os_name, os_data in pol.iteritems():
1385 for field in self.op.output_fields:
1388 elif field == "valid":
1389 val = utils.all([osl and osl[0] for osl in os_data.values()])
1390 elif field == "node_status":
1392 for node_name, nos_list in os_data.iteritems():
1393 val[node_name] = [(v.status, v.path) for v in nos_list]
1395 raise errors.ParameterError(field)
1402 class LURemoveNode(LogicalUnit):
1403 """Logical unit for removing a node.
1406 HPATH = "node-remove"
1407 HTYPE = constants.HTYPE_NODE
1408 _OP_REQP = ["node_name"]
1410 def BuildHooksEnv(self):
1413 This doesn't run on the target node in the pre phase as a failed
1414 node would not allows itself to run.
1418 "OP_TARGET": self.op.node_name,
1419 "NODE_NAME": self.op.node_name,
1421 all_nodes = self.cfg.GetNodeList()
1422 all_nodes.remove(self.op.node_name)
1423 return env, all_nodes, all_nodes
1425 def CheckPrereq(self):
1426 """Check prerequisites.
1429 - the node exists in the configuration
1430 - it does not have primary or secondary instances
1431 - it's not the master
1433 Any errors are signalled by raising errors.OpPrereqError.
1436 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1438 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1440 instance_list = self.cfg.GetInstanceList()
1442 masternode = self.sstore.GetMasterNode()
1443 if node.name == masternode:
1444 raise errors.OpPrereqError("Node is the master node,"
1445 " you need to failover first.")
1447 for instance_name in instance_list:
1448 instance = self.cfg.GetInstanceInfo(instance_name)
1449 if node.name == instance.primary_node:
1450 raise errors.OpPrereqError("Instance %s still running on the node,"
1451 " please remove first." % instance_name)
1452 if node.name in instance.secondary_nodes:
1453 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1454 " please remove first." % instance_name)
1455 self.op.node_name = node.name
1458 def Exec(self, feedback_fn):
1459 """Removes the node from the cluster.
1463 logger.Info("stopping the node daemon and removing configs from node %s" %
1466 rpc.call_node_leave_cluster(node.name)
1468 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1470 logger.Info("Removing node %s from config" % node.name)
1472 self.cfg.RemoveNode(node.name)
1474 _RemoveHostFromEtcHosts(node.name)
1477 class LUQueryNodes(NoHooksLU):
1478 """Logical unit for querying nodes.
1481 _OP_REQP = ["output_fields", "names"]
1483 def CheckPrereq(self):
1484 """Check prerequisites.
1486 This checks that the fields required are valid output fields.
1489 self.dynamic_fields = frozenset([
1491 "mtotal", "mnode", "mfree",
1496 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1497 "pinst_list", "sinst_list",
1498 "pip", "sip", "tags"],
1499 dynamic=self.dynamic_fields,
1500 selected=self.op.output_fields)
1502 self.wanted = _GetWantedNodes(self, self.op.names)
1504 def Exec(self, feedback_fn):
1505 """Computes the list of nodes and their attributes.
1508 nodenames = self.wanted
1509 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1511 # begin data gathering
1513 if self.dynamic_fields.intersection(self.op.output_fields):
1515 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1516 for name in nodenames:
1517 nodeinfo = node_data.get(name, None)
1520 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1521 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1522 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1523 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1524 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1525 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1526 "bootid": nodeinfo['bootid'],
1529 live_data[name] = {}
1531 live_data = dict.fromkeys(nodenames, {})
1533 node_to_primary = dict([(name, set()) for name in nodenames])
1534 node_to_secondary = dict([(name, set()) for name in nodenames])
1536 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1537 "sinst_cnt", "sinst_list"))
1538 if inst_fields & frozenset(self.op.output_fields):
1539 instancelist = self.cfg.GetInstanceList()
1541 for instance_name in instancelist:
1542 inst = self.cfg.GetInstanceInfo(instance_name)
1543 if inst.primary_node in node_to_primary:
1544 node_to_primary[inst.primary_node].add(inst.name)
1545 for secnode in inst.secondary_nodes:
1546 if secnode in node_to_secondary:
1547 node_to_secondary[secnode].add(inst.name)
1549 # end data gathering
1552 for node in nodelist:
1554 for field in self.op.output_fields:
1557 elif field == "pinst_list":
1558 val = list(node_to_primary[node.name])
1559 elif field == "sinst_list":
1560 val = list(node_to_secondary[node.name])
1561 elif field == "pinst_cnt":
1562 val = len(node_to_primary[node.name])
1563 elif field == "sinst_cnt":
1564 val = len(node_to_secondary[node.name])
1565 elif field == "pip":
1566 val = node.primary_ip
1567 elif field == "sip":
1568 val = node.secondary_ip
1569 elif field == "tags":
1570 val = list(node.GetTags())
1571 elif field in self.dynamic_fields:
1572 val = live_data[node.name].get(field, None)
1574 raise errors.ParameterError(field)
1575 node_output.append(val)
1576 output.append(node_output)
1581 class LUQueryNodeVolumes(NoHooksLU):
1582 """Logical unit for getting volumes on node(s).
1585 _OP_REQP = ["nodes", "output_fields"]
1587 def CheckPrereq(self):
1588 """Check prerequisites.
1590 This checks that the fields required are valid output fields.
1593 self.nodes = _GetWantedNodes(self, self.op.nodes)
1595 _CheckOutputFields(static=["node"],
1596 dynamic=["phys", "vg", "name", "size", "instance"],
1597 selected=self.op.output_fields)
1600 def Exec(self, feedback_fn):
1601 """Computes the list of nodes and their attributes.
1604 nodenames = self.nodes
1605 volumes = rpc.call_node_volumes(nodenames)
1607 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1608 in self.cfg.GetInstanceList()]
1610 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1613 for node in nodenames:
1614 if node not in volumes or not volumes[node]:
1617 node_vols = volumes[node][:]
1618 node_vols.sort(key=lambda vol: vol['dev'])
1620 for vol in node_vols:
1622 for field in self.op.output_fields:
1625 elif field == "phys":
1629 elif field == "name":
1631 elif field == "size":
1632 val = int(float(vol['size']))
1633 elif field == "instance":
1635 if node not in lv_by_node[inst]:
1637 if vol['name'] in lv_by_node[inst][node]:
1643 raise errors.ParameterError(field)
1644 node_output.append(str(val))
1646 output.append(node_output)
1651 class LUAddNode(LogicalUnit):
1652 """Logical unit for adding node to the cluster.
1656 HTYPE = constants.HTYPE_NODE
1657 _OP_REQP = ["node_name"]
1659 def BuildHooksEnv(self):
1662 This will run on all nodes before, and on all nodes + the new node after.
1666 "OP_TARGET": self.op.node_name,
1667 "NODE_NAME": self.op.node_name,
1668 "NODE_PIP": self.op.primary_ip,
1669 "NODE_SIP": self.op.secondary_ip,
1671 nodes_0 = self.cfg.GetNodeList()
1672 nodes_1 = nodes_0 + [self.op.node_name, ]
1673 return env, nodes_0, nodes_1
1675 def CheckPrereq(self):
1676 """Check prerequisites.
1679 - the new node is not already in the config
1681 - its parameters (single/dual homed) matches the cluster
1683 Any errors are signalled by raising errors.OpPrereqError.
1686 node_name = self.op.node_name
1689 dns_data = utils.HostInfo(node_name)
1691 node = dns_data.name
1692 primary_ip = self.op.primary_ip = dns_data.ip
1693 secondary_ip = getattr(self.op, "secondary_ip", None)
1694 if secondary_ip is None:
1695 secondary_ip = primary_ip
1696 if not utils.IsValidIP(secondary_ip):
1697 raise errors.OpPrereqError("Invalid secondary IP given")
1698 self.op.secondary_ip = secondary_ip
1700 node_list = cfg.GetNodeList()
1701 if not self.op.readd and node in node_list:
1702 raise errors.OpPrereqError("Node %s is already in the configuration" %
1704 elif self.op.readd and node not in node_list:
1705 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1707 for existing_node_name in node_list:
1708 existing_node = cfg.GetNodeInfo(existing_node_name)
1710 if self.op.readd and node == existing_node_name:
1711 if (existing_node.primary_ip != primary_ip or
1712 existing_node.secondary_ip != secondary_ip):
1713 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1714 " address configuration as before")
1717 if (existing_node.primary_ip == primary_ip or
1718 existing_node.secondary_ip == primary_ip or
1719 existing_node.primary_ip == secondary_ip or
1720 existing_node.secondary_ip == secondary_ip):
1721 raise errors.OpPrereqError("New node ip address(es) conflict with"
1722 " existing node %s" % existing_node.name)
1724 # check that the type of the node (single versus dual homed) is the
1725 # same as for the master
1726 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1727 master_singlehomed = myself.secondary_ip == myself.primary_ip
1728 newbie_singlehomed = secondary_ip == primary_ip
1729 if master_singlehomed != newbie_singlehomed:
1730 if master_singlehomed:
1731 raise errors.OpPrereqError("The master has no private ip but the"
1732 " new node has one")
1734 raise errors.OpPrereqError("The master has a private ip but the"
1735 " new node doesn't have one")
1737 # checks reachablity
1738 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1739 raise errors.OpPrereqError("Node not reachable by ping")
1741 if not newbie_singlehomed:
1742 # check reachability from my secondary ip to newbie's secondary ip
1743 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1744 source=myself.secondary_ip):
1745 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1746 " based ping to noded port")
1748 self.new_node = objects.Node(name=node,
1749 primary_ip=primary_ip,
1750 secondary_ip=secondary_ip)
1752 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1753 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1754 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1755 constants.VNC_PASSWORD_FILE)
1757 def Exec(self, feedback_fn):
1758 """Adds the new node to the cluster.
1761 new_node = self.new_node
1762 node = new_node.name
1764 # set up inter-node password and certificate and restarts the node daemon
1765 gntpass = self.sstore.GetNodeDaemonPassword()
1766 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1767 raise errors.OpExecError("ganeti password corruption detected")
1768 f = open(constants.SSL_CERT_FILE)
1770 gntpem = f.read(8192)
1773 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1774 # so we use this to detect an invalid certificate; as long as the
1775 # cert doesn't contain this, the here-document will be correctly
1776 # parsed by the shell sequence below
1777 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1778 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1779 if not gntpem.endswith("\n"):
1780 raise errors.OpExecError("PEM must end with newline")
1781 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1783 # and then connect with ssh to set password and start ganeti-noded
1784 # note that all the below variables are sanitized at this point,
1785 # either by being constants or by the checks above
1787 mycommand = ("umask 077 && "
1788 "echo '%s' > '%s' && "
1789 "cat > '%s' << '!EOF.' && \n"
1790 "%s!EOF.\n%s restart" %
1791 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1792 constants.SSL_CERT_FILE, gntpem,
1793 constants.NODE_INITD_SCRIPT))
1795 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1797 raise errors.OpExecError("Remote command on node %s, error: %s,"
1799 (node, result.fail_reason, result.output))
1801 # check connectivity
1804 result = rpc.call_version([node])[node]
1806 if constants.PROTOCOL_VERSION == result:
1807 logger.Info("communication to node %s fine, sw version %s match" %
1810 raise errors.OpExecError("Version mismatch master version %s,"
1811 " node version %s" %
1812 (constants.PROTOCOL_VERSION, result))
1814 raise errors.OpExecError("Cannot get version from the new node")
1817 logger.Info("copy ssh key to node %s" % node)
1818 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1820 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1821 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1827 keyarray.append(f.read())
1831 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1832 keyarray[3], keyarray[4], keyarray[5])
1835 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1837 # Add node to our /etc/hosts, and add key to known_hosts
1838 _AddHostToEtcHosts(new_node.name)
1840 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1841 self.cfg.GetHostKey())
1843 if new_node.secondary_ip != new_node.primary_ip:
1844 if not rpc.call_node_tcp_ping(new_node.name,
1845 constants.LOCALHOST_IP_ADDRESS,
1846 new_node.secondary_ip,
1847 constants.DEFAULT_NODED_PORT,
1849 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1850 " you gave (%s). Please fix and re-run this"
1851 " command." % new_node.secondary_ip)
1853 success, msg = ssh.VerifyNodeHostname(node)
1855 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1856 " than the one the resolver gives: %s."
1857 " Please fix and re-run this command." %
1860 # Distribute updated /etc/hosts and known_hosts to all nodes,
1861 # including the node just added
1862 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1863 dist_nodes = self.cfg.GetNodeList()
1864 if not self.op.readd:
1865 dist_nodes.append(node)
1866 if myself.name in dist_nodes:
1867 dist_nodes.remove(myself.name)
1869 logger.Debug("Copying hosts and known_hosts to all nodes")
1870 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1871 result = rpc.call_upload_file(dist_nodes, fname)
1872 for to_node in dist_nodes:
1873 if not result[to_node]:
1874 logger.Error("copy of file %s to node %s failed" %
1877 to_copy = ss.GetFileList()
1878 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1879 to_copy.append(constants.VNC_PASSWORD_FILE)
1880 for fname in to_copy:
1881 if not ssh.CopyFileToNode(node, fname):
1882 logger.Error("could not copy file %s to node %s" % (fname, node))
1884 if not self.op.readd:
1885 logger.Info("adding node %s to cluster.conf" % node)
1886 self.cfg.AddNode(new_node)
1889 class LUMasterFailover(LogicalUnit):
1890 """Failover the master node to the current node.
1892 This is a special LU in that it must run on a non-master node.
1895 HPATH = "master-failover"
1896 HTYPE = constants.HTYPE_CLUSTER
1900 def BuildHooksEnv(self):
1903 This will run on the new master only in the pre phase, and on all
1904 the nodes in the post phase.
1908 "OP_TARGET": self.new_master,
1909 "NEW_MASTER": self.new_master,
1910 "OLD_MASTER": self.old_master,
1912 return env, [self.new_master], self.cfg.GetNodeList()
1914 def CheckPrereq(self):
1915 """Check prerequisites.
1917 This checks that we are not already the master.
1920 self.new_master = utils.HostInfo().name
1921 self.old_master = self.sstore.GetMasterNode()
1923 if self.old_master == self.new_master:
1924 raise errors.OpPrereqError("This commands must be run on the node"
1925 " where you want the new master to be."
1926 " %s is already the master" %
1929 def Exec(self, feedback_fn):
1930 """Failover the master node.
1932 This command, when run on a non-master node, will cause the current
1933 master to cease being master, and the non-master to become new
1937 #TODO: do not rely on gethostname returning the FQDN
1938 logger.Info("setting master to %s, old master: %s" %
1939 (self.new_master, self.old_master))
1941 if not rpc.call_node_stop_master(self.old_master):
1942 logger.Error("could disable the master role on the old master"
1943 " %s, please disable manually" % self.old_master)
1946 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1947 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1948 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1949 logger.Error("could not distribute the new simple store master file"
1950 " to the other nodes, please check.")
1952 if not rpc.call_node_start_master(self.new_master):
1953 logger.Error("could not start the master role on the new master"
1954 " %s, please check" % self.new_master)
1955 feedback_fn("Error in activating the master IP on the new master,"
1956 " please fix manually.")
1960 class LUQueryClusterInfo(NoHooksLU):
1961 """Query cluster configuration.
1967 def CheckPrereq(self):
1968 """No prerequsites needed for this LU.
1973 def Exec(self, feedback_fn):
1974 """Return cluster config.
1978 "name": self.sstore.GetClusterName(),
1979 "software_version": constants.RELEASE_VERSION,
1980 "protocol_version": constants.PROTOCOL_VERSION,
1981 "config_version": constants.CONFIG_VERSION,
1982 "os_api_version": constants.OS_API_VERSION,
1983 "export_version": constants.EXPORT_VERSION,
1984 "master": self.sstore.GetMasterNode(),
1985 "architecture": (platform.architecture()[0], platform.machine()),
1986 "hypervisor_type": self.sstore.GetHypervisorType(),
1992 class LUClusterCopyFile(NoHooksLU):
1993 """Copy file to cluster.
1996 _OP_REQP = ["nodes", "filename"]
1998 def CheckPrereq(self):
1999 """Check prerequisites.
2001 It should check that the named file exists and that the given list
2005 if not os.path.exists(self.op.filename):
2006 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2008 self.nodes = _GetWantedNodes(self, self.op.nodes)
2010 def Exec(self, feedback_fn):
2011 """Copy a file from master to some nodes.
2014 opts - class with options as members
2015 args - list containing a single element, the file name
2017 nodes - list containing the name of target nodes; if empty, all nodes
2020 filename = self.op.filename
2022 myname = utils.HostInfo().name
2024 for node in self.nodes:
2027 if not ssh.CopyFileToNode(node, filename):
2028 logger.Error("Copy of file %s to node %s failed" % (filename, node))
2031 class LUDumpClusterConfig(NoHooksLU):
2032 """Return a text-representation of the cluster-config.
2037 def CheckPrereq(self):
2038 """No prerequisites.
2043 def Exec(self, feedback_fn):
2044 """Dump a representation of the cluster config to the standard output.
2047 return self.cfg.DumpConfig()
2050 class LURunClusterCommand(NoHooksLU):
2051 """Run a command on some nodes.
2054 _OP_REQP = ["command", "nodes"]
2056 def CheckPrereq(self):
2057 """Check prerequisites.
2059 It checks that the given list of nodes is valid.
2062 self.nodes = _GetWantedNodes(self, self.op.nodes)
2064 def Exec(self, feedback_fn):
2065 """Run a command on some nodes.
2068 # put the master at the end of the nodes list
2069 master_node = self.sstore.GetMasterNode()
2070 if master_node in self.nodes:
2071 self.nodes.remove(master_node)
2072 self.nodes.append(master_node)
2075 for node in self.nodes:
2076 result = ssh.SSHCall(node, "root", self.op.command)
2077 data.append((node, result.output, result.exit_code))
2082 class LUActivateInstanceDisks(NoHooksLU):
2083 """Bring up an instance's disks.
2086 _OP_REQP = ["instance_name"]
2088 def CheckPrereq(self):
2089 """Check prerequisites.
2091 This checks that the instance is in the cluster.
2094 instance = self.cfg.GetInstanceInfo(
2095 self.cfg.ExpandInstanceName(self.op.instance_name))
2096 if instance is None:
2097 raise errors.OpPrereqError("Instance '%s' not known" %
2098 self.op.instance_name)
2099 self.instance = instance
2102 def Exec(self, feedback_fn):
2103 """Activate the disks.
2106 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2108 raise errors.OpExecError("Cannot activate block devices")
2113 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2114 """Prepare the block devices for an instance.
2116 This sets up the block devices on all nodes.
2119 instance: a ganeti.objects.Instance object
2120 ignore_secondaries: if true, errors on secondary nodes won't result
2121 in an error return from the function
2124 false if the operation failed
2125 list of (host, instance_visible_name, node_visible_name) if the operation
2126 suceeded with the mapping from node devices to instance devices
2130 iname = instance.name
2131 # With the two passes mechanism we try to reduce the window of
2132 # opportunity for the race condition of switching DRBD to primary
2133 # before handshaking occured, but we do not eliminate it
2135 # The proper fix would be to wait (with some limits) until the
2136 # connection has been made and drbd transitions from WFConnection
2137 # into any other network-connected state (Connected, SyncTarget,
2140 # 1st pass, assemble on all nodes in secondary mode
2141 for inst_disk in instance.disks:
2142 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2143 cfg.SetDiskID(node_disk, node)
2144 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2146 logger.Error("could not prepare block device %s on node %s"
2147 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2148 if not ignore_secondaries:
2151 # FIXME: race condition on drbd migration to primary
2153 # 2nd pass, do only the primary node
2154 for inst_disk in instance.disks:
2155 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2156 if node != instance.primary_node:
2158 cfg.SetDiskID(node_disk, node)
2159 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2161 logger.Error("could not prepare block device %s on node %s"
2162 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2164 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2166 # leave the disks configured for the primary node
2167 # this is a workaround that would be fixed better by
2168 # improving the logical/physical id handling
2169 for disk in instance.disks:
2170 cfg.SetDiskID(disk, instance.primary_node)
2172 return disks_ok, device_info
2175 def _StartInstanceDisks(cfg, instance, force):
2176 """Start the disks of an instance.
2179 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2180 ignore_secondaries=force)
2182 _ShutdownInstanceDisks(instance, cfg)
2183 if force is not None and not force:
2184 logger.Error("If the message above refers to a secondary node,"
2185 " you can retry the operation using '--force'.")
2186 raise errors.OpExecError("Disk consistency error")
2189 class LUDeactivateInstanceDisks(NoHooksLU):
2190 """Shutdown an instance's disks.
2193 _OP_REQP = ["instance_name"]
2195 def CheckPrereq(self):
2196 """Check prerequisites.
2198 This checks that the instance is in the cluster.
2201 instance = self.cfg.GetInstanceInfo(
2202 self.cfg.ExpandInstanceName(self.op.instance_name))
2203 if instance is None:
2204 raise errors.OpPrereqError("Instance '%s' not known" %
2205 self.op.instance_name)
2206 self.instance = instance
2208 def Exec(self, feedback_fn):
2209 """Deactivate the disks
2212 instance = self.instance
2213 ins_l = rpc.call_instance_list([instance.primary_node])
2214 ins_l = ins_l[instance.primary_node]
2215 if not type(ins_l) is list:
2216 raise errors.OpExecError("Can't contact node '%s'" %
2217 instance.primary_node)
2219 if self.instance.name in ins_l:
2220 raise errors.OpExecError("Instance is running, can't shutdown"
2223 _ShutdownInstanceDisks(instance, self.cfg)
2226 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2227 """Shutdown block devices of an instance.
2229 This does the shutdown on all nodes of the instance.
2231 If the ignore_primary is false, errors on the primary node are
2236 for disk in instance.disks:
2237 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2238 cfg.SetDiskID(top_disk, node)
2239 if not rpc.call_blockdev_shutdown(node, top_disk):
2240 logger.Error("could not shutdown block device %s on node %s" %
2241 (disk.iv_name, node))
2242 if not ignore_primary or node != instance.primary_node:
2247 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2248 """Checks if a node has enough free memory.
2250 This function check if a given node has the needed amount of free
2251 memory. In case the node has less memory or we cannot get the
2252 information from the node, this function raise an OpPrereqError
2256 - cfg: a ConfigWriter instance
2257 - node: the node name
2258 - reason: string to use in the error message
2259 - requested: the amount of memory in MiB
2262 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2263 if not nodeinfo or not isinstance(nodeinfo, dict):
2264 raise errors.OpPrereqError("Could not contact node %s for resource"
2265 " information" % (node,))
2267 free_mem = nodeinfo[node].get('memory_free')
2268 if not isinstance(free_mem, int):
2269 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2270 " was '%s'" % (node, free_mem))
2271 if requested > free_mem:
2272 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2273 " needed %s MiB, available %s MiB" %
2274 (node, reason, requested, free_mem))
2277 class LUStartupInstance(LogicalUnit):
2278 """Starts an instance.
2281 HPATH = "instance-start"
2282 HTYPE = constants.HTYPE_INSTANCE
2283 _OP_REQP = ["instance_name", "force"]
2285 def BuildHooksEnv(self):
2288 This runs on master, primary and secondary nodes of the instance.
2292 "FORCE": self.op.force,
2294 env.update(_BuildInstanceHookEnvByObject(self.instance))
2295 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2296 list(self.instance.secondary_nodes))
2299 def CheckPrereq(self):
2300 """Check prerequisites.
2302 This checks that the instance is in the cluster.
2305 instance = self.cfg.GetInstanceInfo(
2306 self.cfg.ExpandInstanceName(self.op.instance_name))
2307 if instance is None:
2308 raise errors.OpPrereqError("Instance '%s' not known" %
2309 self.op.instance_name)
2311 # check bridges existance
2312 _CheckInstanceBridgesExist(instance)
2314 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2315 "starting instance %s" % instance.name,
2318 self.instance = instance
2319 self.op.instance_name = instance.name
2321 def Exec(self, feedback_fn):
2322 """Start the instance.
2325 instance = self.instance
2326 force = self.op.force
2327 extra_args = getattr(self.op, "extra_args", "")
2329 self.cfg.MarkInstanceUp(instance.name)
2331 node_current = instance.primary_node
2333 _StartInstanceDisks(self.cfg, instance, force)
2335 if not rpc.call_instance_start(node_current, instance, extra_args):
2336 _ShutdownInstanceDisks(instance, self.cfg)
2337 raise errors.OpExecError("Could not start instance")
2340 class LURebootInstance(LogicalUnit):
2341 """Reboot an instance.
2344 HPATH = "instance-reboot"
2345 HTYPE = constants.HTYPE_INSTANCE
2346 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2348 def BuildHooksEnv(self):
2351 This runs on master, primary and secondary nodes of the instance.
2355 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2357 env.update(_BuildInstanceHookEnvByObject(self.instance))
2358 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2359 list(self.instance.secondary_nodes))
2362 def CheckPrereq(self):
2363 """Check prerequisites.
2365 This checks that the instance is in the cluster.
2368 instance = self.cfg.GetInstanceInfo(
2369 self.cfg.ExpandInstanceName(self.op.instance_name))
2370 if instance is None:
2371 raise errors.OpPrereqError("Instance '%s' not known" %
2372 self.op.instance_name)
2374 # check bridges existance
2375 _CheckInstanceBridgesExist(instance)
2377 self.instance = instance
2378 self.op.instance_name = instance.name
2380 def Exec(self, feedback_fn):
2381 """Reboot the instance.
2384 instance = self.instance
2385 ignore_secondaries = self.op.ignore_secondaries
2386 reboot_type = self.op.reboot_type
2387 extra_args = getattr(self.op, "extra_args", "")
2389 node_current = instance.primary_node
2391 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2392 constants.INSTANCE_REBOOT_HARD,
2393 constants.INSTANCE_REBOOT_FULL]:
2394 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2395 (constants.INSTANCE_REBOOT_SOFT,
2396 constants.INSTANCE_REBOOT_HARD,
2397 constants.INSTANCE_REBOOT_FULL))
2399 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2400 constants.INSTANCE_REBOOT_HARD]:
2401 if not rpc.call_instance_reboot(node_current, instance,
2402 reboot_type, extra_args):
2403 raise errors.OpExecError("Could not reboot instance")
2405 if not rpc.call_instance_shutdown(node_current, instance):
2406 raise errors.OpExecError("could not shutdown instance for full reboot")
2407 _ShutdownInstanceDisks(instance, self.cfg)
2408 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2409 if not rpc.call_instance_start(node_current, instance, extra_args):
2410 _ShutdownInstanceDisks(instance, self.cfg)
2411 raise errors.OpExecError("Could not start instance for full reboot")
2413 self.cfg.MarkInstanceUp(instance.name)
2416 class LUShutdownInstance(LogicalUnit):
2417 """Shutdown an instance.
2420 HPATH = "instance-stop"
2421 HTYPE = constants.HTYPE_INSTANCE
2422 _OP_REQP = ["instance_name"]
2424 def BuildHooksEnv(self):
2427 This runs on master, primary and secondary nodes of the instance.
2430 env = _BuildInstanceHookEnvByObject(self.instance)
2431 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2432 list(self.instance.secondary_nodes))
2435 def CheckPrereq(self):
2436 """Check prerequisites.
2438 This checks that the instance is in the cluster.
2441 instance = self.cfg.GetInstanceInfo(
2442 self.cfg.ExpandInstanceName(self.op.instance_name))
2443 if instance is None:
2444 raise errors.OpPrereqError("Instance '%s' not known" %
2445 self.op.instance_name)
2446 self.instance = instance
2448 def Exec(self, feedback_fn):
2449 """Shutdown the instance.
2452 instance = self.instance
2453 node_current = instance.primary_node
2454 self.cfg.MarkInstanceDown(instance.name)
2455 if not rpc.call_instance_shutdown(node_current, instance):
2456 logger.Error("could not shutdown instance")
2458 _ShutdownInstanceDisks(instance, self.cfg)
2461 class LUReinstallInstance(LogicalUnit):
2462 """Reinstall an instance.
2465 HPATH = "instance-reinstall"
2466 HTYPE = constants.HTYPE_INSTANCE
2467 _OP_REQP = ["instance_name"]
2469 def BuildHooksEnv(self):
2472 This runs on master, primary and secondary nodes of the instance.
2475 env = _BuildInstanceHookEnvByObject(self.instance)
2476 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2477 list(self.instance.secondary_nodes))
2480 def CheckPrereq(self):
2481 """Check prerequisites.
2483 This checks that the instance is in the cluster and is not running.
2486 instance = self.cfg.GetInstanceInfo(
2487 self.cfg.ExpandInstanceName(self.op.instance_name))
2488 if instance is None:
2489 raise errors.OpPrereqError("Instance '%s' not known" %
2490 self.op.instance_name)
2491 if instance.disk_template == constants.DT_DISKLESS:
2492 raise errors.OpPrereqError("Instance '%s' has no disks" %
2493 self.op.instance_name)
2494 if instance.status != "down":
2495 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2496 self.op.instance_name)
2497 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2499 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2500 (self.op.instance_name,
2501 instance.primary_node))
2503 self.op.os_type = getattr(self.op, "os_type", None)
2504 if self.op.os_type is not None:
2506 pnode = self.cfg.GetNodeInfo(
2507 self.cfg.ExpandNodeName(instance.primary_node))
2509 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2511 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2513 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2514 " primary node" % self.op.os_type)
2516 self.instance = instance
2518 def Exec(self, feedback_fn):
2519 """Reinstall the instance.
2522 inst = self.instance
2524 if self.op.os_type is not None:
2525 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2526 inst.os = self.op.os_type
2527 self.cfg.AddInstance(inst)
2529 _StartInstanceDisks(self.cfg, inst, None)
2531 feedback_fn("Running the instance OS create scripts...")
2532 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2533 raise errors.OpExecError("Could not install OS for instance %s"
2535 (inst.name, inst.primary_node))
2537 _ShutdownInstanceDisks(inst, self.cfg)
2540 class LURenameInstance(LogicalUnit):
2541 """Rename an instance.
2544 HPATH = "instance-rename"
2545 HTYPE = constants.HTYPE_INSTANCE
2546 _OP_REQP = ["instance_name", "new_name"]
2548 def BuildHooksEnv(self):
2551 This runs on master, primary and secondary nodes of the instance.
2554 env = _BuildInstanceHookEnvByObject(self.instance)
2555 env["INSTANCE_NEW_NAME"] = self.op.new_name
2556 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2557 list(self.instance.secondary_nodes))
2560 def CheckPrereq(self):
2561 """Check prerequisites.
2563 This checks that the instance is in the cluster and is not running.
2566 instance = self.cfg.GetInstanceInfo(
2567 self.cfg.ExpandInstanceName(self.op.instance_name))
2568 if instance is None:
2569 raise errors.OpPrereqError("Instance '%s' not known" %
2570 self.op.instance_name)
2571 if instance.status != "down":
2572 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2573 self.op.instance_name)
2574 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2576 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2577 (self.op.instance_name,
2578 instance.primary_node))
2579 self.instance = instance
2581 # new name verification
2582 name_info = utils.HostInfo(self.op.new_name)
2584 self.op.new_name = new_name = name_info.name
2585 instance_list = self.cfg.GetInstanceList()
2586 if new_name in instance_list:
2587 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2590 if not getattr(self.op, "ignore_ip", False):
2591 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2592 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2593 (name_info.ip, new_name))
2596 def Exec(self, feedback_fn):
2597 """Reinstall the instance.
2600 inst = self.instance
2601 old_name = inst.name
2603 self.cfg.RenameInstance(inst.name, self.op.new_name)
2605 # re-read the instance from the configuration after rename
2606 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2608 _StartInstanceDisks(self.cfg, inst, None)
2610 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2612 msg = ("Could run OS rename script for instance %s on node %s (but the"
2613 " instance has been renamed in Ganeti)" %
2614 (inst.name, inst.primary_node))
2617 _ShutdownInstanceDisks(inst, self.cfg)
2620 class LURemoveInstance(LogicalUnit):
2621 """Remove an instance.
2624 HPATH = "instance-remove"
2625 HTYPE = constants.HTYPE_INSTANCE
2626 _OP_REQP = ["instance_name", "ignore_failures"]
2628 def BuildHooksEnv(self):
2631 This runs on master, primary and secondary nodes of the instance.
2634 env = _BuildInstanceHookEnvByObject(self.instance)
2635 nl = [self.sstore.GetMasterNode()]
2638 def CheckPrereq(self):
2639 """Check prerequisites.
2641 This checks that the instance is in the cluster.
2644 instance = self.cfg.GetInstanceInfo(
2645 self.cfg.ExpandInstanceName(self.op.instance_name))
2646 if instance is None:
2647 raise errors.OpPrereqError("Instance '%s' not known" %
2648 self.op.instance_name)
2649 self.instance = instance
2651 def Exec(self, feedback_fn):
2652 """Remove the instance.
2655 instance = self.instance
2656 logger.Info("shutting down instance %s on node %s" %
2657 (instance.name, instance.primary_node))
2659 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2660 if self.op.ignore_failures:
2661 feedback_fn("Warning: can't shutdown instance")
2663 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2664 (instance.name, instance.primary_node))
2666 logger.Info("removing block devices for instance %s" % instance.name)
2668 if not _RemoveDisks(instance, self.cfg):
2669 if self.op.ignore_failures:
2670 feedback_fn("Warning: can't remove instance's disks")
2672 raise errors.OpExecError("Can't remove instance's disks")
2674 logger.Info("removing instance %s out of cluster config" % instance.name)
2676 self.cfg.RemoveInstance(instance.name)
2679 class LUQueryInstances(NoHooksLU):
2680 """Logical unit for querying instances.
2683 _OP_REQP = ["output_fields", "names"]
2685 def CheckPrereq(self):
2686 """Check prerequisites.
2688 This checks that the fields required are valid output fields.
2691 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2692 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2693 "admin_state", "admin_ram",
2694 "disk_template", "ip", "mac", "bridge",
2695 "sda_size", "sdb_size", "vcpus", "tags"],
2696 dynamic=self.dynamic_fields,
2697 selected=self.op.output_fields)
2699 self.wanted = _GetWantedInstances(self, self.op.names)
2701 def Exec(self, feedback_fn):
2702 """Computes the list of nodes and their attributes.
2705 instance_names = self.wanted
2706 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2709 # begin data gathering
2711 nodes = frozenset([inst.primary_node for inst in instance_list])
2714 if self.dynamic_fields.intersection(self.op.output_fields):
2716 node_data = rpc.call_all_instances_info(nodes)
2718 result = node_data[name]
2720 live_data.update(result)
2721 elif result == False:
2722 bad_nodes.append(name)
2723 # else no instance is alive
2725 live_data = dict([(name, {}) for name in instance_names])
2727 # end data gathering
2730 for instance in instance_list:
2732 for field in self.op.output_fields:
2737 elif field == "pnode":
2738 val = instance.primary_node
2739 elif field == "snodes":
2740 val = list(instance.secondary_nodes)
2741 elif field == "admin_state":
2742 val = (instance.status != "down")
2743 elif field == "oper_state":
2744 if instance.primary_node in bad_nodes:
2747 val = bool(live_data.get(instance.name))
2748 elif field == "status":
2749 if instance.primary_node in bad_nodes:
2750 val = "ERROR_nodedown"
2752 running = bool(live_data.get(instance.name))
2754 if instance.status != "down":
2759 if instance.status != "down":
2763 elif field == "admin_ram":
2764 val = instance.memory
2765 elif field == "oper_ram":
2766 if instance.primary_node in bad_nodes:
2768 elif instance.name in live_data:
2769 val = live_data[instance.name].get("memory", "?")
2772 elif field == "disk_template":
2773 val = instance.disk_template
2775 val = instance.nics[0].ip
2776 elif field == "bridge":
2777 val = instance.nics[0].bridge
2778 elif field == "mac":
2779 val = instance.nics[0].mac
2780 elif field == "sda_size" or field == "sdb_size":
2781 disk = instance.FindDisk(field[:3])
2786 elif field == "vcpus":
2787 val = instance.vcpus
2788 elif field == "tags":
2789 val = list(instance.GetTags())
2791 raise errors.ParameterError(field)
2798 class LUFailoverInstance(LogicalUnit):
2799 """Failover an instance.
2802 HPATH = "instance-failover"
2803 HTYPE = constants.HTYPE_INSTANCE
2804 _OP_REQP = ["instance_name", "ignore_consistency"]
2806 def BuildHooksEnv(self):
2809 This runs on master, primary and secondary nodes of the instance.
2813 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2815 env.update(_BuildInstanceHookEnvByObject(self.instance))
2816 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2819 def CheckPrereq(self):
2820 """Check prerequisites.
2822 This checks that the instance is in the cluster.
2825 instance = self.cfg.GetInstanceInfo(
2826 self.cfg.ExpandInstanceName(self.op.instance_name))
2827 if instance is None:
2828 raise errors.OpPrereqError("Instance '%s' not known" %
2829 self.op.instance_name)
2831 if instance.disk_template not in constants.DTS_NET_MIRROR:
2832 raise errors.OpPrereqError("Instance's disk layout is not"
2833 " network mirrored, cannot failover.")
2835 secondary_nodes = instance.secondary_nodes
2836 if not secondary_nodes:
2837 raise errors.ProgrammerError("no secondary node but using "
2838 "DT_REMOTE_RAID1 template")
2840 target_node = secondary_nodes[0]
2841 # check memory requirements on the secondary node
2842 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2843 instance.name, instance.memory)
2845 # check bridge existance
2846 brlist = [nic.bridge for nic in instance.nics]
2847 if not rpc.call_bridges_exist(target_node, brlist):
2848 raise errors.OpPrereqError("One or more target bridges %s does not"
2849 " exist on destination node '%s'" %
2850 (brlist, target_node))
2852 self.instance = instance
2854 def Exec(self, feedback_fn):
2855 """Failover an instance.
2857 The failover is done by shutting it down on its present node and
2858 starting it on the secondary.
2861 instance = self.instance
2863 source_node = instance.primary_node
2864 target_node = instance.secondary_nodes[0]
2866 feedback_fn("* checking disk consistency between source and target")
2867 for dev in instance.disks:
2868 # for remote_raid1, these are md over drbd
2869 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2870 if instance.status == "up" and not self.op.ignore_consistency:
2871 raise errors.OpExecError("Disk %s is degraded on target node,"
2872 " aborting failover." % dev.iv_name)
2874 feedback_fn("* shutting down instance on source node")
2875 logger.Info("Shutting down instance %s on node %s" %
2876 (instance.name, source_node))
2878 if not rpc.call_instance_shutdown(source_node, instance):
2879 if self.op.ignore_consistency:
2880 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2881 " anyway. Please make sure node %s is down" %
2882 (instance.name, source_node, source_node))
2884 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2885 (instance.name, source_node))
2887 feedback_fn("* deactivating the instance's disks on source node")
2888 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2889 raise errors.OpExecError("Can't shut down the instance's disks.")
2891 instance.primary_node = target_node
2892 # distribute new instance config to the other nodes
2893 self.cfg.Update(instance)
2895 # Only start the instance if it's marked as up
2896 if instance.status == "up":
2897 feedback_fn("* activating the instance's disks on target node")
2898 logger.Info("Starting instance %s on node %s" %
2899 (instance.name, target_node))
2901 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2902 ignore_secondaries=True)
2904 _ShutdownInstanceDisks(instance, self.cfg)
2905 raise errors.OpExecError("Can't activate the instance's disks")
2907 feedback_fn("* starting the instance on the target node")
2908 if not rpc.call_instance_start(target_node, instance, None):
2909 _ShutdownInstanceDisks(instance, self.cfg)
2910 raise errors.OpExecError("Could not start instance %s on node %s." %
2911 (instance.name, target_node))
2914 class LUMigrateInstance(LogicalUnit):
2915 """Migrate an instance.
2917 This is migration without shutting down, compared to the failover,
2918 which is done with shutdown.
2921 HPATH = "instance-migrate"
2922 HTYPE = constants.HTYPE_INSTANCE
2923 _OP_REQP = ["instance_name", "live", "cleanup"]
2925 def BuildHooksEnv(self):
2928 This runs on master, primary and secondary nodes of the instance.
2931 env = _BuildInstanceHookEnvByObject(self.instance)
2932 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2935 def CheckPrereq(self):
2936 """Check prerequisites.
2938 This checks that the instance is in the cluster.
2941 instance = self.cfg.GetInstanceInfo(
2942 self.cfg.ExpandInstanceName(self.op.instance_name))
2943 if instance is None:
2944 raise errors.OpPrereqError("Instance '%s' not known" %
2945 self.op.instance_name)
2947 if instance.disk_template != constants.DT_DRBD8:
2948 raise errors.OpPrereqError("Instance's disk layout is not"
2949 " drbd8, cannot migrate.")
2951 secondary_nodes = instance.secondary_nodes
2952 if not secondary_nodes:
2953 raise errors.ProgrammerError("no secondary node but using "
2954 "drbd8 disk template")
2956 target_node = secondary_nodes[0]
2957 # check memory requirements on the secondary node
2958 _CheckNodeFreeMemory(self.cfg, target_node, "migrating instance %s" %
2959 instance.name, instance.memory)
2961 # check bridge existance
2962 brlist = [nic.bridge for nic in instance.nics]
2963 if not rpc.call_bridges_exist(target_node, brlist):
2964 raise errors.OpPrereqError("One or more target bridges %s does not"
2965 " exist on destination node '%s'" %
2966 (brlist, target_node))
2968 if not self.op.cleanup:
2969 migratable = rpc.call_instance_migratable(instance.primary_node,
2972 raise errors.OpPrereqError("Can't contact node '%s'" %
2973 instance.primary_node)
2974 if not migratable[0]:
2975 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
2978 self.instance = instance
2980 def _WaitUntilSync(self):
2981 """Poll with custom rpc for disk sync.
2983 This uses our own step-based rpc call.
2986 self.feedback_fn("* wait until resync is done")
2990 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
2991 self.instance.disks,
2992 self.nodes_ip, False,
2993 constants.DRBD_RECONF_RPC_WFSYNC)
2995 for node in self.all_nodes:
2996 if not result[node] or not result[node][0]:
2997 raise errors.OpExecError("Cannot resync disks on node %s" % (node,))
2998 node_done, node_percent = result[node][1]
2999 all_done = all_done and node_done
3000 if node_percent is not None:
3001 min_percent = min(min_percent, node_percent)
3003 if min_percent < 100:
3004 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3007 def _EnsureSecondary(self, node):
3008 """Demote a node to secondary.
3011 self.feedback_fn("* switching node %s to secondary mode" % node)
3012 result = rpc.call_drbd_reconfig_net([node], self.instance.name,
3013 self.instance.disks,
3014 self.nodes_ip, False,
3015 constants.DRBD_RECONF_RPC_SECONDARY)
3016 if not result[node] or not result[node][0]:
3017 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3019 (node, result[node][1]))
3021 def _GoStandalone(self):
3022 """Disconnect from the network.
3025 self.feedback_fn("* changing into standalone mode")
3026 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3027 self.instance.disks,
3028 self.nodes_ip, True,
3029 constants.DRBD_RECONF_RPC_DISCONNECT)
3030 for node in self.all_nodes:
3031 if not result[node] or not result[node][0]:
3032 raise errors.OpExecError("Cannot disconnect disks node %s,"
3033 " error %s" % (node, result[node][1]))
3035 def _GoReconnect(self, multimaster):
3036 """Reconnect to the network.
3042 msg = "single-master"
3043 self.feedback_fn("* changing disks into %s mode" % msg)
3044 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3045 self.instance.disks,
3048 constants.DRBD_RECONF_RPC_RECONNECT)
3049 for node in self.all_nodes:
3050 if not result[node] or not result[node][0]:
3051 raise errors.OpExecError("Cannot change disks config on node %s,"
3052 " error %s" % (node, result[node][1]))
3054 def _IdentifyDisks(self):
3055 """Start the migration RPC sequence.
3058 self.feedback_fn("* identifying disks")
3059 result = rpc.call_drbd_reconfig_net(self.all_nodes,
3061 self.instance.disks,
3062 self.nodes_ip, True,
3063 constants.DRBD_RECONF_RPC_INIT)
3064 for node in self.all_nodes:
3065 if not result[node] or not result[node][0]:
3066 raise errors.OpExecError("Cannot identify disks node %s,"
3067 " error %s" % (node, result[node][1]))
3069 def _ExecCleanup(self):
3070 """Try to cleanup after a failed migration.
3072 The cleanup is done by:
3073 - check that the instance is running only on one node
3074 (and update the config if needed)
3075 - change disks on its secondary node to secondary
3076 - wait until disks are fully synchronized
3077 - disconnect from the network
3078 - change disks into single-master mode
3079 - wait again until disks are fully synchronized
3082 instance = self.instance
3083 target_node = self.target_node
3084 source_node = self.source_node
3086 # check running on only one node
3087 self.feedback_fn("* checking where the instance actually runs"
3088 " (if this hangs, the hypervisor might be in"
3090 ins_l = rpc.call_instance_list(self.all_nodes)
3091 for node in self.all_nodes:
3092 if not type(ins_l[node]) is list:
3093 raise errors.OpExecError("Can't contact node '%s'" % node)
3095 runningon_source = instance.name in ins_l[source_node]
3096 runningon_target = instance.name in ins_l[target_node]
3098 if runningon_source and runningon_target:
3099 raise errors.OpExecError("Instance seems to be running on two nodes,"
3100 " or the hypervisor is confused. You will have"
3101 " to ensure manually that it runs only on one"
3102 " and restart this operation.")
3104 if not (runningon_source or runningon_target):
3105 raise errors.OpExecError("Instance does not seem to be running at all."
3106 " In this case, it's safer to repair by"
3107 " running 'gnt-instance stop' to ensure disk"
3108 " shutdown, and then restarting it.")
3110 if runningon_target:
3111 # the migration has actually succeeded, we need to update the config
3112 self.feedback_fn("* instance running on secondary node (%s),"
3113 " updating config" % target_node)
3114 instance.primary_node = target_node
3115 self.cfg.Update(instance)
3116 demoted_node = source_node
3118 self.feedback_fn("* instance confirmed to be running on its"
3119 " primary node (%s)" % source_node)
3120 demoted_node = target_node
3122 self._IdentifyDisks()
3124 self._EnsureSecondary(demoted_node)
3125 self._WaitUntilSync()
3126 self._GoStandalone()
3127 self._GoReconnect(False)
3128 self._WaitUntilSync()
3130 self.feedback_fn("* done")
3132 def _ExecMigration(self):
3133 """Migrate an instance.
3135 The migrate is done by:
3136 - change the disks into dual-master mode
3137 - wait until disks are fully synchronized again
3138 - migrate the instance
3139 - change disks on the new secondary node (the old primary) to secondary
3140 - wait until disks are fully synchronized
3141 - change disks into single-master mode
3144 instance = self.instance
3145 target_node = self.target_node
3146 source_node = self.source_node
3148 self.feedback_fn("* checking disk consistency between source and target")
3149 for dev in instance.disks:
3150 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3151 raise errors.OpExecError("Disk %s is degraded or not fully"
3152 " synchronized on target node,"
3153 " aborting migrate." % dev.iv_name)
3155 self._IdentifyDisks()
3157 self._EnsureSecondary(target_node)
3158 self._GoStandalone()
3159 self._GoReconnect(True)
3160 self._WaitUntilSync()
3162 self.feedback_fn("* migrating instance to %s" % target_node)
3164 result = rpc.call_instance_migrate(source_node, instance,
3165 self.nodes_ip[target_node],
3167 if not result or not result[0]:
3168 logger.Error("Instance migration failed, trying to revert disk status")
3170 self._EnsureSecondary(target_node)
3171 self._GoStandalone()
3172 self._GoReconnect(False)
3173 self._WaitUntilSync()
3174 except errors.OpExecError, err:
3175 logger.Error("Can't reconnect the drives: error '%s'\n"
3176 "Please look and recover the instance status" % str(err))
3178 raise errors.OpExecError("Could not migrate instance %s: %s" %
3179 (instance.name, result[1]))
3182 instance.primary_node = target_node
3183 # distribute new instance config to the other nodes
3184 self.cfg.Update(instance)
3186 self._EnsureSecondary(source_node)
3187 self._WaitUntilSync()
3188 self._GoStandalone()
3189 self._GoReconnect(False)
3190 self._WaitUntilSync()
3192 self.feedback_fn("* done")
3194 def Exec(self, feedback_fn):
3195 """Perform the migration.
3198 self.feedback_fn = feedback_fn
3200 self.source_node = self.instance.primary_node
3201 self.target_node = self.instance.secondary_nodes[0]
3202 self.all_nodes = [self.source_node, self.target_node]
3204 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3205 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3208 return self._ExecCleanup()
3210 return self._ExecMigration()
3213 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3214 """Create a tree of block devices on the primary node.
3216 This always creates all devices.
3220 for child in device.children:
3221 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3224 cfg.SetDiskID(device, node)
3225 new_id = rpc.call_blockdev_create(node, device, device.size,
3226 instance.name, True, info)
3229 if device.physical_id is None:
3230 device.physical_id = new_id
3234 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3235 """Create a tree of block devices on a secondary node.
3237 If this device type has to be created on secondaries, create it and
3240 If not, just recurse to children keeping the same 'force' value.
3243 if device.CreateOnSecondary():
3246 for child in device.children:
3247 if not _CreateBlockDevOnSecondary(cfg, node, instance,
3248 child, force, info):
3253 cfg.SetDiskID(device, node)
3254 new_id = rpc.call_blockdev_create(node, device, device.size,
3255 instance.name, False, info)
3258 if device.physical_id is None:
3259 device.physical_id = new_id
3263 def _GenerateUniqueNames(cfg, exts):
3264 """Generate a suitable LV name.
3266 This will generate a logical volume name for the given instance.
3271 new_id = cfg.GenerateUniqueID()
3272 results.append("%s%s" % (new_id, val))
3276 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3277 """Generate a drbd device complete with its children.
3280 port = cfg.AllocatePort()
3281 vgname = cfg.GetVGName()
3282 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3283 logical_id=(vgname, names[0]))
3284 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3285 logical_id=(vgname, names[1]))
3286 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3287 logical_id = (primary, secondary, port),
3288 children = [dev_data, dev_meta])
3292 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3293 """Generate a drbd8 device complete with its children.
3296 port = cfg.AllocatePort()
3297 vgname = cfg.GetVGName()
3298 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3299 logical_id=(vgname, names[0]))
3300 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3301 logical_id=(vgname, names[1]))
3302 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3303 logical_id = (primary, secondary, port),
3304 children = [dev_data, dev_meta],
3308 def _GenerateDiskTemplate(cfg, template_name,
3309 instance_name, primary_node,
3310 secondary_nodes, disk_sz, swap_sz):
3311 """Generate the entire disk layout for a given template type.
3314 #TODO: compute space requirements
3316 vgname = cfg.GetVGName()
3317 if template_name == constants.DT_DISKLESS:
3319 elif template_name == constants.DT_PLAIN:
3320 if len(secondary_nodes) != 0:
3321 raise errors.ProgrammerError("Wrong template configuration")
3323 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3324 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3325 logical_id=(vgname, names[0]),
3327 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3328 logical_id=(vgname, names[1]),
3330 disks = [sda_dev, sdb_dev]
3331 elif template_name == constants.DT_LOCAL_RAID1:
3332 if len(secondary_nodes) != 0:
3333 raise errors.ProgrammerError("Wrong template configuration")
3336 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3337 ".sdb_m1", ".sdb_m2"])
3338 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3339 logical_id=(vgname, names[0]))
3340 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3341 logical_id=(vgname, names[1]))
3342 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3344 children = [sda_dev_m1, sda_dev_m2])
3345 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3346 logical_id=(vgname, names[2]))
3347 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3348 logical_id=(vgname, names[3]))
3349 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3351 children = [sdb_dev_m1, sdb_dev_m2])
3352 disks = [md_sda_dev, md_sdb_dev]
3353 elif template_name == constants.DT_REMOTE_RAID1:
3354 if len(secondary_nodes) != 1:
3355 raise errors.ProgrammerError("Wrong template configuration")
3356 remote_node = secondary_nodes[0]
3357 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3358 ".sdb_data", ".sdb_meta"])
3359 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3360 disk_sz, names[0:2])
3361 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3362 children = [drbd_sda_dev], size=disk_sz)
3363 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3364 swap_sz, names[2:4])
3365 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3366 children = [drbd_sdb_dev], size=swap_sz)
3367 disks = [md_sda_dev, md_sdb_dev]
3368 elif template_name == constants.DT_DRBD8:
3369 if len(secondary_nodes) != 1:
3370 raise errors.ProgrammerError("Wrong template configuration")
3371 remote_node = secondary_nodes[0]
3372 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3373 ".sdb_data", ".sdb_meta"])
3374 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3375 disk_sz, names[0:2], "sda")
3376 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3377 swap_sz, names[2:4], "sdb")
3378 disks = [drbd_sda_dev, drbd_sdb_dev]
3380 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3384 def _GetInstanceInfoText(instance):
3385 """Compute that text that should be added to the disk's metadata.
3388 return "originstname+%s" % instance.name
3391 def _CreateDisks(cfg, instance):
3392 """Create all disks for an instance.
3394 This abstracts away some work from AddInstance.
3397 instance: the instance object
3400 True or False showing the success of the creation process
3403 info = _GetInstanceInfoText(instance)
3405 for device in instance.disks:
3406 logger.Info("creating volume %s for instance %s" %
3407 (device.iv_name, instance.name))
3409 for secondary_node in instance.secondary_nodes:
3410 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3411 device, False, info):
3412 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3413 (device.iv_name, device, secondary_node))
3416 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3417 instance, device, info):
3418 logger.Error("failed to create volume %s on primary!" %
3424 def _RemoveDisks(instance, cfg):
3425 """Remove all disks for an instance.
3427 This abstracts away some work from `AddInstance()` and
3428 `RemoveInstance()`. Note that in case some of the devices couldn't
3429 be removed, the removal will continue with the other ones (compare
3430 with `_CreateDisks()`).
3433 instance: the instance object
3436 True or False showing the success of the removal proces
3439 logger.Info("removing block devices for instance %s" % instance.name)
3442 for device in instance.disks:
3443 for node, disk in device.ComputeNodeTree(instance.primary_node):
3444 cfg.SetDiskID(disk, node)
3445 if not rpc.call_blockdev_remove(node, disk):
3446 logger.Error("could not remove block device %s on node %s,"
3447 " continuing anyway" %
3448 (device.iv_name, node))
3453 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3454 """Compute disk size requirements in the volume group
3456 This is currently hard-coded for the two-drive layout.
3459 # Required free disk space as a function of disk and swap space
3461 constants.DT_DISKLESS: None,
3462 constants.DT_PLAIN: disk_size + swap_size,
3463 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3464 # 256 MB are added for drbd metadata, 128MB for each drbd device
3465 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3466 constants.DT_DRBD8: disk_size + swap_size + 256,
3469 if disk_template not in req_size_dict:
3470 raise errors.ProgrammerError("Disk template '%s' size requirement"
3471 " is unknown" % disk_template)
3473 return req_size_dict[disk_template]
3476 class LUCreateInstance(LogicalUnit):
3477 """Create an instance.
3480 HPATH = "instance-add"
3481 HTYPE = constants.HTYPE_INSTANCE
3482 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3483 "disk_template", "swap_size", "mode", "start", "vcpus",
3484 "wait_for_sync", "ip_check", "mac"]
3486 def _RunAllocator(self):
3487 """Run the allocator based on input opcode.
3490 disks = [{"size": self.op.disk_size, "mode": "w"},
3491 {"size": self.op.swap_size, "mode": "w"}]
3492 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3493 "bridge": self.op.bridge}]
3494 ial = IAllocator(self.cfg, self.sstore,
3495 mode=constants.IALLOCATOR_MODE_ALLOC,
3496 name=self.op.instance_name,
3497 disk_template=self.op.disk_template,
3500 vcpus=self.op.vcpus,
3501 mem_size=self.op.mem_size,
3506 ial.Run(self.op.iallocator)
3509 raise errors.OpPrereqError("Can't compute nodes using"
3510 " iallocator '%s': %s" % (self.op.iallocator,
3512 if len(ial.nodes) != ial.required_nodes:
3513 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3514 " of nodes (%s), required %s" %
3515 (len(ial.nodes), ial.required_nodes))
3516 self.op.pnode = ial.nodes[0]
3517 logger.ToStdout("Selected nodes for the instance: %s" %
3518 (", ".join(ial.nodes),))
3519 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3520 (self.op.instance_name, self.op.iallocator, ial.nodes))
3521 if ial.required_nodes == 2:
3522 self.op.snode = ial.nodes[1]
3524 def BuildHooksEnv(self):
3527 This runs on master, primary and secondary nodes of the instance.
3531 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3532 "INSTANCE_DISK_SIZE": self.op.disk_size,
3533 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3534 "INSTANCE_ADD_MODE": self.op.mode,
3536 if self.op.mode == constants.INSTANCE_IMPORT:
3537 env["INSTANCE_SRC_NODE"] = self.op.src_node
3538 env["INSTANCE_SRC_PATH"] = self.op.src_path
3539 env["INSTANCE_SRC_IMAGE"] = self.src_image
3541 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3542 primary_node=self.op.pnode,
3543 secondary_nodes=self.secondaries,
3544 status=self.instance_status,
3545 os_type=self.op.os_type,
3546 memory=self.op.mem_size,
3547 vcpus=self.op.vcpus,
3548 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3551 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3556 def CheckPrereq(self):
3557 """Check prerequisites.
3560 # set optional parameters to none if they don't exist
3561 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3562 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3563 "vnc_bind_address"]:
3564 if not hasattr(self.op, attr):
3565 setattr(self.op, attr, None)
3567 if self.op.mode not in (constants.INSTANCE_CREATE,
3568 constants.INSTANCE_IMPORT):
3569 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3572 if self.op.mode == constants.INSTANCE_IMPORT:
3573 src_node = getattr(self.op, "src_node", None)
3574 src_path = getattr(self.op, "src_path", None)
3575 if src_node is None or src_path is None:
3576 raise errors.OpPrereqError("Importing an instance requires source"
3577 " node and path options")
3578 src_node_full = self.cfg.ExpandNodeName(src_node)
3579 if src_node_full is None:
3580 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3581 self.op.src_node = src_node = src_node_full
3583 if not os.path.isabs(src_path):
3584 raise errors.OpPrereqError("The source path must be absolute")
3586 export_info = rpc.call_export_info(src_node, src_path)
3589 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3591 if not export_info.has_section(constants.INISECT_EXP):
3592 raise errors.ProgrammerError("Corrupted export config")
3594 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3595 if (int(ei_version) != constants.EXPORT_VERSION):
3596 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3597 (ei_version, constants.EXPORT_VERSION))
3599 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3600 raise errors.OpPrereqError("Can't import instance with more than"
3603 # FIXME: are the old os-es, disk sizes, etc. useful?
3604 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3605 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3607 self.src_image = diskimage
3608 else: # INSTANCE_CREATE
3609 if getattr(self.op, "os_type", None) is None:
3610 raise errors.OpPrereqError("No guest OS specified")
3612 #### instance parameters check
3614 # disk template and mirror node verification
3615 if self.op.disk_template not in constants.DISK_TEMPLATES:
3616 raise errors.OpPrereqError("Invalid disk template name")
3618 # instance name verification
3619 hostname1 = utils.HostInfo(self.op.instance_name)
3621 self.op.instance_name = instance_name = hostname1.name
3622 instance_list = self.cfg.GetInstanceList()
3623 if instance_name in instance_list:
3624 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3627 # ip validity checks
3628 ip = getattr(self.op, "ip", None)
3629 if ip is None or ip.lower() == "none":
3631 elif ip.lower() == "auto":
3632 inst_ip = hostname1.ip
3634 if not utils.IsValidIP(ip):
3635 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3636 " like a valid IP" % ip)
3638 self.inst_ip = self.op.ip = inst_ip
3640 if self.op.start and not self.op.ip_check:
3641 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3642 " adding an instance in start mode")
3644 if self.op.ip_check:
3645 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3646 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3647 (hostname1.ip, instance_name))
3649 # MAC address verification
3650 if self.op.mac != "auto":
3651 if not utils.IsValidMac(self.op.mac.lower()):
3652 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3655 # bridge verification
3656 bridge = getattr(self.op, "bridge", None)
3658 self.op.bridge = self.cfg.GetDefBridge()
3660 self.op.bridge = bridge
3662 # boot order verification
3663 if self.op.hvm_boot_order is not None:
3664 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3665 raise errors.OpPrereqError("invalid boot order specified,"
3666 " must be one or more of [acdn]")
3669 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3670 raise errors.OpPrereqError("One and only one of iallocator and primary"
3671 " node must be given")
3673 if self.op.iallocator is not None:
3674 self._RunAllocator()
3676 #### node related checks
3678 # check primary node
3679 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3681 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3683 self.op.pnode = pnode.name
3685 self.secondaries = []
3687 # mirror node verification
3688 if self.op.disk_template in constants.DTS_NET_MIRROR:
3689 if getattr(self.op, "snode", None) is None:
3690 raise errors.OpPrereqError("The networked disk templates need"
3693 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3694 if snode_name is None:
3695 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3697 elif snode_name == pnode.name:
3698 raise errors.OpPrereqError("The secondary node cannot be"
3699 " the primary node.")
3700 self.secondaries.append(snode_name)
3702 req_size = _ComputeDiskSize(self.op.disk_template,
3703 self.op.disk_size, self.op.swap_size)
3705 # Check lv size requirements
3706 if req_size is not None:
3707 nodenames = [pnode.name] + self.secondaries
3708 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3709 for node in nodenames:
3710 info = nodeinfo.get(node, None)
3712 raise errors.OpPrereqError("Cannot get current information"
3713 " from node '%s'" % node)
3714 vg_free = info.get('vg_free', None)
3715 if not isinstance(vg_free, int):
3716 raise errors.OpPrereqError("Can't compute free disk space on"
3718 if req_size > info['vg_free']:
3719 raise errors.OpPrereqError("Not enough disk space on target node %s."
3720 " %d MB available, %d MB required" %
3721 (node, info['vg_free'], req_size))
3724 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3726 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3727 " primary node" % self.op.os_type)
3729 if self.op.kernel_path == constants.VALUE_NONE:
3730 raise errors.OpPrereqError("Can't set instance kernel to none")
3733 # bridge check on primary node
3734 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3735 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3736 " destination node '%s'" %
3737 (self.op.bridge, pnode.name))
3739 # memory check on primary node
3741 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3742 "creating instance %s" % self.op.instance_name,
3745 # hvm_cdrom_image_path verification
3746 if self.op.hvm_cdrom_image_path is not None:
3747 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3748 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3749 " be an absolute path or None, not %s" %
3750 self.op.hvm_cdrom_image_path)
3751 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3752 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3753 " regular file or a symlink pointing to"
3754 " an existing regular file, not %s" %
3755 self.op.hvm_cdrom_image_path)
3757 # vnc_bind_address verification
3758 if self.op.vnc_bind_address is not None:
3759 if not utils.IsValidIP(self.op.vnc_bind_address):
3760 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3761 " like a valid IP address" %
3762 self.op.vnc_bind_address)
3765 self.instance_status = 'up'
3767 self.instance_status = 'down'
3769 def Exec(self, feedback_fn):
3770 """Create and add the instance to the cluster.
3773 instance = self.op.instance_name
3774 pnode_name = self.pnode.name
3776 if self.op.mac == "auto":
3777 mac_address = self.cfg.GenerateMAC()
3779 mac_address = self.op.mac
3781 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3782 if self.inst_ip is not None:
3783 nic.ip = self.inst_ip
3785 ht_kind = self.sstore.GetHypervisorType()
3786 if ht_kind in constants.HTS_REQ_PORT:
3787 network_port = self.cfg.AllocatePort()
3791 if self.op.vnc_bind_address is None:
3792 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3794 disks = _GenerateDiskTemplate(self.cfg,
3795 self.op.disk_template,
3796 instance, pnode_name,
3797 self.secondaries, self.op.disk_size,
3800 iobj = objects.Instance(name=instance, os=self.op.os_type,
3801 primary_node=pnode_name,
3802 memory=self.op.mem_size,
3803 vcpus=self.op.vcpus,
3804 nics=[nic], disks=disks,
3805 disk_template=self.op.disk_template,
3806 status=self.instance_status,
3807 network_port=network_port,
3808 kernel_path=self.op.kernel_path,
3809 initrd_path=self.op.initrd_path,
3810 hvm_boot_order=self.op.hvm_boot_order,
3811 hvm_acpi=self.op.hvm_acpi,
3812 hvm_pae=self.op.hvm_pae,
3813 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3814 vnc_bind_address=self.op.vnc_bind_address,
3817 feedback_fn("* creating instance disks...")
3818 if not _CreateDisks(self.cfg, iobj):
3819 _RemoveDisks(iobj, self.cfg)
3820 raise errors.OpExecError("Device creation failed, reverting...")
3822 feedback_fn("adding instance %s to cluster config" % instance)
3824 self.cfg.AddInstance(iobj)
3826 if self.op.wait_for_sync:
3827 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3828 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3829 # make sure the disks are not degraded (still sync-ing is ok)
3831 feedback_fn("* checking mirrors status")
3832 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3837 _RemoveDisks(iobj, self.cfg)
3838 self.cfg.RemoveInstance(iobj.name)
3839 raise errors.OpExecError("There are some degraded disks for"
3842 feedback_fn("creating os for instance %s on node %s" %
3843 (instance, pnode_name))
3845 if iobj.disk_template != constants.DT_DISKLESS:
3846 if self.op.mode == constants.INSTANCE_CREATE:
3847 feedback_fn("* running the instance OS create scripts...")
3848 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3849 raise errors.OpExecError("could not add os for instance %s"
3851 (instance, pnode_name))
3853 elif self.op.mode == constants.INSTANCE_IMPORT:
3854 feedback_fn("* running the instance OS import scripts...")
3855 src_node = self.op.src_node
3856 src_image = self.src_image
3857 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3858 src_node, src_image):
3859 raise errors.OpExecError("Could not import os for instance"
3861 (instance, pnode_name))
3863 # also checked in the prereq part
3864 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3868 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3869 feedback_fn("* starting instance...")
3870 if not rpc.call_instance_start(pnode_name, iobj, None):
3871 raise errors.OpExecError("Could not start instance")
3874 class LUConnectConsole(NoHooksLU):
3875 """Connect to an instance's console.
3877 This is somewhat special in that it returns the command line that
3878 you need to run on the master node in order to connect to the
3882 _OP_REQP = ["instance_name"]
3884 def CheckPrereq(self):
3885 """Check prerequisites.
3887 This checks that the instance is in the cluster.
3890 instance = self.cfg.GetInstanceInfo(
3891 self.cfg.ExpandInstanceName(self.op.instance_name))
3892 if instance is None:
3893 raise errors.OpPrereqError("Instance '%s' not known" %
3894 self.op.instance_name)
3895 self.instance = instance
3897 def Exec(self, feedback_fn):
3898 """Connect to the console of an instance
3901 instance = self.instance
3902 node = instance.primary_node
3904 node_insts = rpc.call_instance_list([node])[node]
3905 if node_insts is False:
3906 raise errors.OpExecError("Can't connect to node %s." % node)
3908 if instance.name not in node_insts:
3909 raise errors.OpExecError("Instance %s is not running." % instance.name)
3911 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3913 hyper = hypervisor.GetHypervisor()
3914 console_cmd = hyper.GetShellCommandForConsole(instance)
3916 argv = ["ssh", "-q", "-t"]
3917 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3918 argv.extend(ssh.BATCH_MODE_OPTS)
3920 argv.append(console_cmd)
3924 class LUAddMDDRBDComponent(LogicalUnit):
3925 """Adda new mirror member to an instance's disk.
3928 HPATH = "mirror-add"
3929 HTYPE = constants.HTYPE_INSTANCE
3930 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3932 def BuildHooksEnv(self):
3935 This runs on the master, the primary and all the secondaries.
3939 "NEW_SECONDARY": self.op.remote_node,
3940 "DISK_NAME": self.op.disk_name,
3942 env.update(_BuildInstanceHookEnvByObject(self.instance))
3943 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3944 self.op.remote_node,] + list(self.instance.secondary_nodes)
3947 def CheckPrereq(self):
3948 """Check prerequisites.
3950 This checks that the instance is in the cluster.
3953 instance = self.cfg.GetInstanceInfo(
3954 self.cfg.ExpandInstanceName(self.op.instance_name))
3955 if instance is None:
3956 raise errors.OpPrereqError("Instance '%s' not known" %
3957 self.op.instance_name)
3958 self.instance = instance
3960 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3961 if remote_node is None:
3962 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3963 self.remote_node = remote_node
3965 if remote_node == instance.primary_node:
3966 raise errors.OpPrereqError("The specified node is the primary node of"
3969 if instance.disk_template != constants.DT_REMOTE_RAID1:
3970 raise errors.OpPrereqError("Instance's disk layout is not"
3972 for disk in instance.disks:
3973 if disk.iv_name == self.op.disk_name:
3976 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3977 " instance." % self.op.disk_name)
3978 if len(disk.children) > 1:
3979 raise errors.OpPrereqError("The device already has two slave devices."
3980 " This would create a 3-disk raid1 which we"
3984 def Exec(self, feedback_fn):
3985 """Add the mirror component
3989 instance = self.instance
3991 remote_node = self.remote_node
3992 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3993 names = _GenerateUniqueNames(self.cfg, lv_names)
3994 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3995 remote_node, disk.size, names)
3997 logger.Info("adding new mirror component on secondary")
3999 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
4001 _GetInstanceInfoText(instance)):
4002 raise errors.OpExecError("Failed to create new component on secondary"
4003 " node %s" % remote_node)
4005 logger.Info("adding new mirror component on primary")
4007 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
4009 _GetInstanceInfoText(instance)):
4010 # remove secondary dev
4011 self.cfg.SetDiskID(new_drbd, remote_node)
4012 rpc.call_blockdev_remove(remote_node, new_drbd)
4013 raise errors.OpExecError("Failed to create volume on primary")
4015 # the device exists now
4016 # call the primary node to add the mirror to md
4017 logger.Info("adding new mirror component to md")
4018 if not rpc.call_blockdev_addchildren(instance.primary_node,
4020 logger.Error("Can't add mirror compoment to md!")
4021 self.cfg.SetDiskID(new_drbd, remote_node)
4022 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4023 logger.Error("Can't rollback on secondary")
4024 self.cfg.SetDiskID(new_drbd, instance.primary_node)
4025 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4026 logger.Error("Can't rollback on primary")
4027 raise errors.OpExecError("Can't add mirror component to md array")
4029 disk.children.append(new_drbd)
4031 self.cfg.AddInstance(instance)
4033 _WaitForSync(self.cfg, instance, self.proc)
4038 class LURemoveMDDRBDComponent(LogicalUnit):
4039 """Remove a component from a remote_raid1 disk.
4042 HPATH = "mirror-remove"
4043 HTYPE = constants.HTYPE_INSTANCE
4044 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
4046 def BuildHooksEnv(self):
4049 This runs on the master, the primary and all the secondaries.
4053 "DISK_NAME": self.op.disk_name,
4054 "DISK_ID": self.op.disk_id,
4055 "OLD_SECONDARY": self.old_secondary,
4057 env.update(_BuildInstanceHookEnvByObject(self.instance))
4058 nl = [self.sstore.GetMasterNode(),
4059 self.instance.primary_node] + list(self.instance.secondary_nodes)
4062 def CheckPrereq(self):
4063 """Check prerequisites.
4065 This checks that the instance is in the cluster.
4068 instance = self.cfg.GetInstanceInfo(
4069 self.cfg.ExpandInstanceName(self.op.instance_name))
4070 if instance is None:
4071 raise errors.OpPrereqError("Instance '%s' not known" %
4072 self.op.instance_name)
4073 self.instance = instance
4075 if instance.disk_template != constants.DT_REMOTE_RAID1:
4076 raise errors.OpPrereqError("Instance's disk layout is not"
4078 for disk in instance.disks:
4079 if disk.iv_name == self.op.disk_name:
4082 raise errors.OpPrereqError("Can't find this device ('%s') in the"
4083 " instance." % self.op.disk_name)
4084 for child in disk.children:
4085 if (child.dev_type == constants.LD_DRBD7 and
4086 child.logical_id[2] == self.op.disk_id):
4089 raise errors.OpPrereqError("Can't find the device with this port.")
4091 if len(disk.children) < 2:
4092 raise errors.OpPrereqError("Cannot remove the last component from"
4096 if self.child.logical_id[0] == instance.primary_node:
4100 self.old_secondary = self.child.logical_id[oid]
4102 def Exec(self, feedback_fn):
4103 """Remove the mirror component
4106 instance = self.instance
4109 logger.Info("remove mirror component")
4110 self.cfg.SetDiskID(disk, instance.primary_node)
4111 if not rpc.call_blockdev_removechildren(instance.primary_node,
4113 raise errors.OpExecError("Can't remove child from mirror.")
4115 for node in child.logical_id[:2]:
4116 self.cfg.SetDiskID(child, node)
4117 if not rpc.call_blockdev_remove(node, child):
4118 logger.Error("Warning: failed to remove device from node %s,"
4119 " continuing operation." % node)
4121 disk.children.remove(child)
4122 self.cfg.AddInstance(instance)
4125 class LUReplaceDisks(LogicalUnit):
4126 """Replace the disks of an instance.
4129 HPATH = "mirrors-replace"
4130 HTYPE = constants.HTYPE_INSTANCE
4131 _OP_REQP = ["instance_name", "mode", "disks"]
4133 def _RunAllocator(self):
4134 """Compute a new secondary node using an IAllocator.
4137 ial = IAllocator(self.cfg, self.sstore,
4138 mode=constants.IALLOCATOR_MODE_RELOC,
4139 name=self.op.instance_name,
4140 relocate_from=[self.sec_node])
4142 ial.Run(self.op.iallocator)
4145 raise errors.OpPrereqError("Can't compute nodes using"
4146 " iallocator '%s': %s" % (self.op.iallocator,
4148 if len(ial.nodes) != ial.required_nodes:
4149 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4150 " of nodes (%s), required %s" %
4151 (len(ial.nodes), ial.required_nodes))
4152 self.op.remote_node = ial.nodes[0]
4153 logger.ToStdout("Selected new secondary for the instance: %s" %
4154 self.op.remote_node)
4156 def BuildHooksEnv(self):
4159 This runs on the master, the primary and all the secondaries.
4163 "MODE": self.op.mode,
4164 "NEW_SECONDARY": self.op.remote_node,
4165 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4167 env.update(_BuildInstanceHookEnvByObject(self.instance))
4169 self.sstore.GetMasterNode(),
4170 self.instance.primary_node,
4172 if self.op.remote_node is not None:
4173 nl.append(self.op.remote_node)
4176 def CheckPrereq(self):
4177 """Check prerequisites.
4179 This checks that the instance is in the cluster.
4182 if not hasattr(self.op, "remote_node"):
4183 self.op.remote_node = None
4185 instance = self.cfg.GetInstanceInfo(
4186 self.cfg.ExpandInstanceName(self.op.instance_name))
4187 if instance is None:
4188 raise errors.OpPrereqError("Instance '%s' not known" %
4189 self.op.instance_name)
4190 self.instance = instance
4191 self.op.instance_name = instance.name
4193 if instance.disk_template not in constants.DTS_NET_MIRROR:
4194 raise errors.OpPrereqError("Instance's disk layout is not"
4195 " network mirrored.")
4197 if len(instance.secondary_nodes) != 1:
4198 raise errors.OpPrereqError("The instance has a strange layout,"
4199 " expected one secondary but found %d" %
4200 len(instance.secondary_nodes))
4202 self.sec_node = instance.secondary_nodes[0]
4204 ia_name = getattr(self.op, "iallocator", None)
4205 if ia_name is not None:
4206 if self.op.remote_node is not None:
4207 raise errors.OpPrereqError("Give either the iallocator or the new"
4208 " secondary, not both")
4209 self.op.remote_node = self._RunAllocator()
4211 remote_node = self.op.remote_node
4212 if remote_node is not None:
4213 remote_node = self.cfg.ExpandNodeName(remote_node)
4214 if remote_node is None:
4215 raise errors.OpPrereqError("Node '%s' not known" %
4216 self.op.remote_node)
4217 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4219 self.remote_node_info = None
4220 if remote_node == instance.primary_node:
4221 raise errors.OpPrereqError("The specified node is the primary node of"
4223 elif remote_node == self.sec_node:
4224 if self.op.mode == constants.REPLACE_DISK_SEC:
4225 # this is for DRBD8, where we can't execute the same mode of
4226 # replacement as for drbd7 (no different port allocated)
4227 raise errors.OpPrereqError("Same secondary given, cannot execute"
4229 # the user gave the current secondary, switch to
4230 # 'no-replace-secondary' mode for drbd7
4232 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4233 self.op.mode != constants.REPLACE_DISK_ALL):
4234 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4235 " disks replacement, not individual ones")
4236 if instance.disk_template == constants.DT_DRBD8:
4237 if (self.op.mode == constants.REPLACE_DISK_ALL and
4238 remote_node is not None):
4239 # switch to replace secondary mode
4240 self.op.mode = constants.REPLACE_DISK_SEC
4242 if self.op.mode == constants.REPLACE_DISK_ALL:
4243 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4244 " secondary disk replacement, not"
4246 elif self.op.mode == constants.REPLACE_DISK_PRI:
4247 if remote_node is not None:
4248 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4249 " the secondary while doing a primary"
4250 " node disk replacement")
4251 self.tgt_node = instance.primary_node
4252 self.oth_node = instance.secondary_nodes[0]
4253 elif self.op.mode == constants.REPLACE_DISK_SEC:
4254 self.new_node = remote_node # this can be None, in which case
4255 # we don't change the secondary
4256 self.tgt_node = instance.secondary_nodes[0]
4257 self.oth_node = instance.primary_node
4259 raise errors.ProgrammerError("Unhandled disk replace mode")
4261 for name in self.op.disks:
4262 if instance.FindDisk(name) is None:
4263 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4264 (name, instance.name))
4265 self.op.remote_node = remote_node
4267 def _ExecRR1(self, feedback_fn):
4268 """Replace the disks of an instance.
4271 instance = self.instance
4274 if self.op.remote_node is None:
4275 remote_node = self.sec_node
4277 remote_node = self.op.remote_node
4279 for dev in instance.disks:
4281 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4282 names = _GenerateUniqueNames(cfg, lv_names)
4283 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4284 remote_node, size, names)
4285 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4286 logger.Info("adding new mirror component on secondary for %s" %
4289 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4291 _GetInstanceInfoText(instance)):
4292 raise errors.OpExecError("Failed to create new component on secondary"
4293 " node %s. Full abort, cleanup manually!" %
4296 logger.Info("adding new mirror component on primary")
4298 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4300 _GetInstanceInfoText(instance)):
4301 # remove secondary dev
4302 cfg.SetDiskID(new_drbd, remote_node)
4303 rpc.call_blockdev_remove(remote_node, new_drbd)
4304 raise errors.OpExecError("Failed to create volume on primary!"
4305 " Full abort, cleanup manually!!")
4307 # the device exists now
4308 # call the primary node to add the mirror to md
4309 logger.Info("adding new mirror component to md")
4310 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4312 logger.Error("Can't add mirror compoment to md!")
4313 cfg.SetDiskID(new_drbd, remote_node)
4314 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4315 logger.Error("Can't rollback on secondary")
4316 cfg.SetDiskID(new_drbd, instance.primary_node)
4317 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4318 logger.Error("Can't rollback on primary")
4319 raise errors.OpExecError("Full abort, cleanup manually!!")
4321 dev.children.append(new_drbd)
4322 cfg.AddInstance(instance)
4324 # this can fail as the old devices are degraded and _WaitForSync
4325 # does a combined result over all disks, so we don't check its
4327 _WaitForSync(cfg, instance, self.proc, unlock=True)
4329 # so check manually all the devices
4330 for name in iv_names:
4331 dev, child, new_drbd = iv_names[name]
4332 cfg.SetDiskID(dev, instance.primary_node)
4333 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4335 raise errors.OpExecError("MD device %s is degraded!" % name)
4336 cfg.SetDiskID(new_drbd, instance.primary_node)
4337 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4339 raise errors.OpExecError("New drbd device %s is degraded!" % name)
4341 for name in iv_names:
4342 dev, child, new_drbd = iv_names[name]
4343 logger.Info("remove mirror %s component" % name)
4344 cfg.SetDiskID(dev, instance.primary_node)
4345 if not rpc.call_blockdev_removechildren(instance.primary_node,
4347 logger.Error("Can't remove child from mirror, aborting"
4348 " *this device cleanup*.\nYou need to cleanup manually!!")
4351 for node in child.logical_id[:2]:
4352 logger.Info("remove child device on %s" % node)
4353 cfg.SetDiskID(child, node)
4354 if not rpc.call_blockdev_remove(node, child):
4355 logger.Error("Warning: failed to remove device from node %s,"
4356 " continuing operation." % node)
4358 dev.children.remove(child)
4360 cfg.AddInstance(instance)
4362 def _ExecD8DiskOnly(self, feedback_fn):
4363 """Replace a disk on the primary or secondary for dbrd8.
4365 The algorithm for replace is quite complicated:
4366 - for each disk to be replaced:
4367 - create new LVs on the target node with unique names
4368 - detach old LVs from the drbd device
4369 - rename old LVs to name_replaced.<time_t>
4370 - rename new LVs to old LVs
4371 - attach the new LVs (with the old names now) to the drbd device
4372 - wait for sync across all devices
4373 - for each modified disk:
4374 - remove old LVs (which have the name name_replaces.<time_t>)
4376 Failures are not very well handled.
4380 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4381 instance = self.instance
4383 vgname = self.cfg.GetVGName()
4386 tgt_node = self.tgt_node
4387 oth_node = self.oth_node
4389 # Step: check device activation
4390 self.proc.LogStep(1, steps_total, "check device existence")
4391 info("checking volume groups")
4392 my_vg = cfg.GetVGName()
4393 results = rpc.call_vg_list([oth_node, tgt_node])
4395 raise errors.OpExecError("Can't list volume groups on the nodes")
4396 for node in oth_node, tgt_node:
4397 res = results.get(node, False)
4398 if not res or my_vg not in res:
4399 raise errors.OpExecError("Volume group '%s' not found on %s" %
4401 for dev in instance.disks:
4402 if not dev.iv_name in self.op.disks:
4404 for node in tgt_node, oth_node:
4405 info("checking %s on %s" % (dev.iv_name, node))
4406 cfg.SetDiskID(dev, node)
4407 if not rpc.call_blockdev_find(node, dev):
4408 raise errors.OpExecError("Can't find device %s on node %s" %
4409 (dev.iv_name, node))
4411 # Step: check other node consistency
4412 self.proc.LogStep(2, steps_total, "check peer consistency")
4413 for dev in instance.disks:
4414 if not dev.iv_name in self.op.disks:
4416 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4417 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4418 oth_node==instance.primary_node):
4419 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4420 " to replace disks on this node (%s)" %
4421 (oth_node, tgt_node))
4423 # Step: create new storage
4424 self.proc.LogStep(3, steps_total, "allocate new storage")
4425 for dev in instance.disks:
4426 if not dev.iv_name in self.op.disks:
4429 cfg.SetDiskID(dev, tgt_node)
4430 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4431 names = _GenerateUniqueNames(cfg, lv_names)
4432 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4433 logical_id=(vgname, names[0]))
4434 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4435 logical_id=(vgname, names[1]))
4436 new_lvs = [lv_data, lv_meta]
4437 old_lvs = dev.children
4438 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4439 info("creating new local storage on %s for %s" %
4440 (tgt_node, dev.iv_name))
4441 # since we *always* want to create this LV, we use the
4442 # _Create...OnPrimary (which forces the creation), even if we
4443 # are talking about the secondary node
4444 for new_lv in new_lvs:
4445 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4446 _GetInstanceInfoText(instance)):
4447 raise errors.OpExecError("Failed to create new LV named '%s' on"
4449 (new_lv.logical_id[1], tgt_node))
4451 # Step: for each lv, detach+rename*2+attach
4452 self.proc.LogStep(4, steps_total, "change drbd configuration")
4453 for dev, old_lvs, new_lvs in iv_names.itervalues():
4454 info("detaching %s drbd from local storage" % dev.iv_name)
4455 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4456 raise errors.OpExecError("Can't detach drbd from local storage on node"
4457 " %s for device %s" % (tgt_node, dev.iv_name))
4459 #cfg.Update(instance)
4461 # ok, we created the new LVs, so now we know we have the needed
4462 # storage; as such, we proceed on the target node to rename
4463 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4464 # using the assumption that logical_id == physical_id (which in
4465 # turn is the unique_id on that node)
4467 # FIXME(iustin): use a better name for the replaced LVs
4468 temp_suffix = int(time.time())
4469 ren_fn = lambda d, suff: (d.physical_id[0],
4470 d.physical_id[1] + "_replaced-%s" % suff)
4471 # build the rename list based on what LVs exist on the node
4473 for to_ren in old_lvs:
4474 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4475 if find_res is not None: # device exists
4476 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4478 info("renaming the old LVs on the target node")
4479 if not rpc.call_blockdev_rename(tgt_node, rlist):
4480 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4481 # now we rename the new LVs to the old LVs
4482 info("renaming the new LVs on the target node")
4483 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4484 if not rpc.call_blockdev_rename(tgt_node, rlist):
4485 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4487 for old, new in zip(old_lvs, new_lvs):
4488 new.logical_id = old.logical_id
4489 cfg.SetDiskID(new, tgt_node)
4491 for disk in old_lvs:
4492 disk.logical_id = ren_fn(disk, temp_suffix)
4493 cfg.SetDiskID(disk, tgt_node)
4495 # now that the new lvs have the old name, we can add them to the device
4496 info("adding new mirror component on %s" % tgt_node)
4497 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4498 for new_lv in new_lvs:
4499 if not rpc.call_blockdev_remove(tgt_node, new_lv):
4500 warning("Can't rollback device %s", hint="manually cleanup unused"
4502 raise errors.OpExecError("Can't add local storage to drbd")
4504 dev.children = new_lvs
4505 cfg.Update(instance)
4507 # Step: wait for sync
4509 # this can fail as the old devices are degraded and _WaitForSync
4510 # does a combined result over all disks, so we don't check its
4512 self.proc.LogStep(5, steps_total, "sync devices")
4513 _WaitForSync(cfg, instance, self.proc, unlock=True)
4515 # so check manually all the devices
4516 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4517 cfg.SetDiskID(dev, instance.primary_node)
4518 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4520 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4522 # Step: remove old storage
4523 self.proc.LogStep(6, steps_total, "removing old storage")
4524 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4525 info("remove logical volumes for %s" % name)
4527 cfg.SetDiskID(lv, tgt_node)
4528 if not rpc.call_blockdev_remove(tgt_node, lv):
4529 warning("Can't remove old LV", hint="manually remove unused LVs")
4532 def _ExecD8Secondary(self, feedback_fn):
4533 """Replace the secondary node for drbd8.
4535 The algorithm for replace is quite complicated:
4536 - for all disks of the instance:
4537 - create new LVs on the new node with same names
4538 - shutdown the drbd device on the old secondary
4539 - disconnect the drbd network on the primary
4540 - create the drbd device on the new secondary
4541 - network attach the drbd on the primary, using an artifice:
4542 the drbd code for Attach() will connect to the network if it
4543 finds a device which is connected to the good local disks but
4545 - wait for sync across all devices
4546 - remove all disks from the old secondary
4548 Failures are not very well handled.
4552 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4553 instance = self.instance
4555 vgname = self.cfg.GetVGName()
4558 old_node = self.tgt_node
4559 new_node = self.new_node
4560 pri_node = instance.primary_node
4562 # Step: check device activation
4563 self.proc.LogStep(1, steps_total, "check device existence")
4564 info("checking volume groups")
4565 my_vg = cfg.GetVGName()
4566 results = rpc.call_vg_list([pri_node, new_node])
4568 raise errors.OpExecError("Can't list volume groups on the nodes")
4569 for node in pri_node, new_node:
4570 res = results.get(node, False)
4571 if not res or my_vg not in res:
4572 raise errors.OpExecError("Volume group '%s' not found on %s" %
4574 for dev in instance.disks:
4575 if not dev.iv_name in self.op.disks:
4577 info("checking %s on %s" % (dev.iv_name, pri_node))
4578 cfg.SetDiskID(dev, pri_node)
4579 if not rpc.call_blockdev_find(pri_node, dev):
4580 raise errors.OpExecError("Can't find device %s on node %s" %
4581 (dev.iv_name, pri_node))
4583 # Step: check other node consistency
4584 self.proc.LogStep(2, steps_total, "check peer consistency")
4585 for dev in instance.disks:
4586 if not dev.iv_name in self.op.disks:
4588 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4589 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4590 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4591 " unsafe to replace the secondary" %
4594 # Step: create new storage
4595 self.proc.LogStep(3, steps_total, "allocate new storage")
4596 for dev in instance.disks:
4598 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4599 # since we *always* want to create this LV, we use the
4600 # _Create...OnPrimary (which forces the creation), even if we
4601 # are talking about the secondary node
4602 for new_lv in dev.children:
4603 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4604 _GetInstanceInfoText(instance)):
4605 raise errors.OpExecError("Failed to create new LV named '%s' on"
4607 (new_lv.logical_id[1], new_node))
4609 iv_names[dev.iv_name] = (dev, dev.children)
4611 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4612 for dev in instance.disks:
4614 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4615 # create new devices on new_node
4616 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4617 logical_id=(pri_node, new_node,
4619 children=dev.children)
4620 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4622 _GetInstanceInfoText(instance)):
4623 raise errors.OpExecError("Failed to create new DRBD on"
4624 " node '%s'" % new_node)
4626 for dev in instance.disks:
4627 # we have new devices, shutdown the drbd on the old secondary
4628 info("shutting down drbd for %s on old node" % dev.iv_name)
4629 cfg.SetDiskID(dev, old_node)
4630 if not rpc.call_blockdev_shutdown(old_node, dev):
4631 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4632 hint="Please cleanup this device manually as soon as possible")
4634 info("detaching primary drbds from the network (=> standalone)")
4636 for dev in instance.disks:
4637 cfg.SetDiskID(dev, pri_node)
4638 # set the physical (unique in bdev terms) id to None, meaning
4639 # detach from network
4640 dev.physical_id = (None,) * len(dev.physical_id)
4641 # and 'find' the device, which will 'fix' it to match the
4643 if rpc.call_blockdev_find(pri_node, dev):
4646 warning("Failed to detach drbd %s from network, unusual case" %
4650 # no detaches succeeded (very unlikely)
4651 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4653 # if we managed to detach at least one, we update all the disks of
4654 # the instance to point to the new secondary
4655 info("updating instance configuration")
4656 for dev in instance.disks:
4657 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4658 cfg.SetDiskID(dev, pri_node)
4659 cfg.Update(instance)
4661 # and now perform the drbd attach
4662 info("attaching primary drbds to new secondary (standalone => connected)")
4664 for dev in instance.disks:
4665 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4666 # since the attach is smart, it's enough to 'find' the device,
4667 # it will automatically activate the network, if the physical_id
4669 cfg.SetDiskID(dev, pri_node)
4670 if not rpc.call_blockdev_find(pri_node, dev):
4671 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4672 "please do a gnt-instance info to see the status of disks")
4674 # this can fail as the old devices are degraded and _WaitForSync
4675 # does a combined result over all disks, so we don't check its
4677 self.proc.LogStep(5, steps_total, "sync devices")
4678 _WaitForSync(cfg, instance, self.proc, unlock=True)
4680 # so check manually all the devices
4681 for name, (dev, old_lvs) in iv_names.iteritems():
4682 cfg.SetDiskID(dev, pri_node)
4683 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4685 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4687 self.proc.LogStep(6, steps_total, "removing old storage")
4688 for name, (dev, old_lvs) in iv_names.iteritems():
4689 info("remove logical volumes for %s" % name)
4691 cfg.SetDiskID(lv, old_node)
4692 if not rpc.call_blockdev_remove(old_node, lv):
4693 warning("Can't remove LV on old secondary",
4694 hint="Cleanup stale volumes by hand")
4696 def Exec(self, feedback_fn):
4697 """Execute disk replacement.
4699 This dispatches the disk replacement to the appropriate handler.
4702 instance = self.instance
4704 # Activate the instance disks if we're replacing them on a down instance
4705 if instance.status == "down":
4706 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4707 self.proc.ChainOpCode(op)
4709 if instance.disk_template == constants.DT_REMOTE_RAID1:
4711 elif instance.disk_template == constants.DT_DRBD8:
4712 if self.op.remote_node is None:
4713 fn = self._ExecD8DiskOnly
4715 fn = self._ExecD8Secondary
4717 raise errors.ProgrammerError("Unhandled disk replacement case")
4719 ret = fn(feedback_fn)
4721 # Deactivate the instance disks if we're replacing them on a down instance
4722 if instance.status == "down":
4723 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4724 self.proc.ChainOpCode(op)
4729 class LUGrowDisk(LogicalUnit):
4730 """Grow a disk of an instance.
4734 HTYPE = constants.HTYPE_INSTANCE
4735 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4737 def BuildHooksEnv(self):
4740 This runs on the master, the primary and all the secondaries.
4744 "DISK": self.op.disk,
4745 "AMOUNT": self.op.amount,
4747 env.update(_BuildInstanceHookEnvByObject(self.instance))
4749 self.sstore.GetMasterNode(),
4750 self.instance.primary_node,
4754 def CheckPrereq(self):
4755 """Check prerequisites.
4757 This checks that the instance is in the cluster.
4760 instance = self.cfg.GetInstanceInfo(
4761 self.cfg.ExpandInstanceName(self.op.instance_name))
4762 if instance is None:
4763 raise errors.OpPrereqError("Instance '%s' not known" %
4764 self.op.instance_name)
4766 if self.op.amount <= 0:
4767 raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4769 self.instance = instance
4770 self.op.instance_name = instance.name
4772 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4773 raise errors.OpPrereqError("Instance's disk layout does not support"
4776 self.disk = instance.FindDisk(self.op.disk)
4777 if self.disk is None:
4778 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4779 (self.op.disk, instance.name))
4781 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4782 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4783 for node in nodenames:
4784 info = nodeinfo.get(node, None)
4786 raise errors.OpPrereqError("Cannot get current information"
4787 " from node '%s'" % node)
4788 vg_free = info.get('vg_free', None)
4789 if not isinstance(vg_free, int):
4790 raise errors.OpPrereqError("Can't compute free disk space on"
4792 if self.op.amount > info['vg_free']:
4793 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4794 " %d MiB available, %d MiB required" %
4795 (node, info['vg_free'], self.op.amount))
4796 is_primary = (node == instance.primary_node)
4797 if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4798 raise errors.OpPrereqError("Disk %s is degraded or not fully"
4799 " synchronized on node %s,"
4800 " aborting grow." % (self.op.disk, node))
4802 def Exec(self, feedback_fn):
4803 """Execute disk grow.
4806 instance = self.instance
4808 for node in (instance.secondary_nodes + (instance.primary_node,)):
4809 self.cfg.SetDiskID(disk, node)
4810 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4811 if not result or not isinstance(result, tuple) or len(result) != 2:
4812 raise errors.OpExecError("grow request failed to node %s" % node)
4814 raise errors.OpExecError("grow request failed to node %s: %s" %
4816 disk.RecordGrow(self.op.amount)
4817 self.cfg.Update(instance)
4818 if self.op.wait_for_sync:
4819 disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4821 logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4822 " Please check the instance.")
4825 class LUQueryInstanceData(NoHooksLU):
4826 """Query runtime instance data.
4829 _OP_REQP = ["instances"]
4831 def CheckPrereq(self):
4832 """Check prerequisites.
4834 This only checks the optional instance list against the existing names.
4837 if not isinstance(self.op.instances, list):
4838 raise errors.OpPrereqError("Invalid argument type 'instances'")
4839 if self.op.instances:
4840 self.wanted_instances = []
4841 names = self.op.instances
4843 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4844 if instance is None:
4845 raise errors.OpPrereqError("No such instance name '%s'" % name)
4846 self.wanted_instances.append(instance)
4848 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4849 in self.cfg.GetInstanceList()]
4853 def _ComputeDiskStatus(self, instance, snode, dev):
4854 """Compute block device status.
4857 self.cfg.SetDiskID(dev, instance.primary_node)
4858 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4859 if dev.dev_type in constants.LDS_DRBD:
4860 # we change the snode then (otherwise we use the one passed in)
4861 if dev.logical_id[0] == instance.primary_node:
4862 snode = dev.logical_id[1]
4864 snode = dev.logical_id[0]
4867 self.cfg.SetDiskID(dev, snode)
4868 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4873 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4874 for child in dev.children]
4879 "iv_name": dev.iv_name,
4880 "dev_type": dev.dev_type,
4881 "logical_id": dev.logical_id,
4882 "physical_id": dev.physical_id,
4883 "pstatus": dev_pstatus,
4884 "sstatus": dev_sstatus,
4885 "children": dev_children,
4890 def Exec(self, feedback_fn):
4891 """Gather and return data"""
4893 for instance in self.wanted_instances:
4894 remote_info = rpc.call_instance_info(instance.primary_node,
4896 if remote_info and "state" in remote_info:
4899 remote_state = "down"
4900 if instance.status == "down":
4901 config_state = "down"
4905 disks = [self._ComputeDiskStatus(instance, None, device)
4906 for device in instance.disks]
4909 "name": instance.name,
4910 "config_state": config_state,
4911 "run_state": remote_state,
4912 "pnode": instance.primary_node,
4913 "snodes": instance.secondary_nodes,
4915 "memory": instance.memory,
4916 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4918 "vcpus": instance.vcpus,
4921 htkind = self.sstore.GetHypervisorType()
4922 if htkind == constants.HT_XEN_PVM30:
4923 idict["kernel_path"] = instance.kernel_path
4924 idict["initrd_path"] = instance.initrd_path
4926 if htkind == constants.HT_XEN_HVM31:
4927 idict["hvm_boot_order"] = instance.hvm_boot_order
4928 idict["hvm_acpi"] = instance.hvm_acpi
4929 idict["hvm_pae"] = instance.hvm_pae
4930 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4932 if htkind in constants.HTS_REQ_PORT:
4933 idict["vnc_bind_address"] = instance.vnc_bind_address
4934 idict["network_port"] = instance.network_port
4936 result[instance.name] = idict
4941 class LUSetInstanceParms(LogicalUnit):
4942 """Modifies an instances's parameters.
4945 HPATH = "instance-modify"
4946 HTYPE = constants.HTYPE_INSTANCE
4947 _OP_REQP = ["instance_name"]
4949 def BuildHooksEnv(self):
4952 This runs on the master, primary and secondaries.
4957 args['memory'] = self.mem
4959 args['vcpus'] = self.vcpus
4960 if self.do_ip or self.do_bridge or self.mac:
4964 ip = self.instance.nics[0].ip
4966 bridge = self.bridge
4968 bridge = self.instance.nics[0].bridge
4972 mac = self.instance.nics[0].mac
4973 args['nics'] = [(ip, bridge, mac)]
4974 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4975 nl = [self.sstore.GetMasterNode(),
4976 self.instance.primary_node] + list(self.instance.secondary_nodes)
4979 def CheckPrereq(self):
4980 """Check prerequisites.
4982 This only checks the instance list against the existing names.
4985 self.mem = getattr(self.op, "mem", None)
4986 self.vcpus = getattr(self.op, "vcpus", None)
4987 self.ip = getattr(self.op, "ip", None)
4988 self.mac = getattr(self.op, "mac", None)
4989 self.bridge = getattr(self.op, "bridge", None)
4990 self.kernel_path = getattr(self.op, "kernel_path", None)
4991 self.initrd_path = getattr(self.op, "initrd_path", None)
4992 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4993 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4994 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4995 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4996 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4997 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4998 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4999 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
5000 self.vnc_bind_address]
5001 if all_parms.count(None) == len(all_parms):
5002 raise errors.OpPrereqError("No changes submitted")
5003 if self.mem is not None:
5005 self.mem = int(self.mem)
5006 except ValueError, err:
5007 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
5008 if self.vcpus is not None:
5010 self.vcpus = int(self.vcpus)
5011 except ValueError, err:
5012 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
5013 if self.ip is not None:
5015 if self.ip.lower() == "none":
5018 if not utils.IsValidIP(self.ip):
5019 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
5022 self.do_bridge = (self.bridge is not None)
5023 if self.mac is not None:
5024 if self.cfg.IsMacInUse(self.mac):
5025 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
5027 if not utils.IsValidMac(self.mac):
5028 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
5030 if self.kernel_path is not None:
5031 self.do_kernel_path = True
5032 if self.kernel_path == constants.VALUE_NONE:
5033 raise errors.OpPrereqError("Can't set instance to no kernel")
5035 if self.kernel_path != constants.VALUE_DEFAULT:
5036 if not os.path.isabs(self.kernel_path):
5037 raise errors.OpPrereqError("The kernel path must be an absolute"
5040 self.do_kernel_path = False
5042 if self.initrd_path is not None:
5043 self.do_initrd_path = True
5044 if self.initrd_path not in (constants.VALUE_NONE,
5045 constants.VALUE_DEFAULT):
5046 if not os.path.isabs(self.initrd_path):
5047 raise errors.OpPrereqError("The initrd path must be an absolute"
5050 self.do_initrd_path = False
5052 # boot order verification
5053 if self.hvm_boot_order is not None:
5054 if self.hvm_boot_order != constants.VALUE_DEFAULT:
5055 if len(self.hvm_boot_order.strip("acdn")) != 0:
5056 raise errors.OpPrereqError("invalid boot order specified,"
5057 " must be one or more of [acdn]"
5060 # hvm_cdrom_image_path verification
5061 if self.op.hvm_cdrom_image_path is not None:
5062 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
5063 self.op.hvm_cdrom_image_path.lower() == "none"):
5064 raise errors.OpPrereqError("The path to the HVM CDROM image must"
5065 " be an absolute path or None, not %s" %
5066 self.op.hvm_cdrom_image_path)
5067 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
5068 self.op.hvm_cdrom_image_path.lower() == "none"):
5069 raise errors.OpPrereqError("The HVM CDROM image must either be a"
5070 " regular file or a symlink pointing to"
5071 " an existing regular file, not %s" %
5072 self.op.hvm_cdrom_image_path)
5074 # vnc_bind_address verification
5075 if self.op.vnc_bind_address is not None:
5076 if not utils.IsValidIP(self.op.vnc_bind_address):
5077 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
5078 " like a valid IP address" %
5079 self.op.vnc_bind_address)
5081 instance = self.cfg.GetInstanceInfo(
5082 self.cfg.ExpandInstanceName(self.op.instance_name))
5083 if instance is None:
5084 raise errors.OpPrereqError("No such instance name '%s'" %
5085 self.op.instance_name)
5086 self.op.instance_name = instance.name
5087 self.instance = instance
5090 def Exec(self, feedback_fn):
5091 """Modifies an instance.
5093 All parameters take effect only at the next restart of the instance.
5096 instance = self.instance
5098 instance.memory = self.mem
5099 result.append(("mem", self.mem))
5101 instance.vcpus = self.vcpus
5102 result.append(("vcpus", self.vcpus))
5104 instance.nics[0].ip = self.ip
5105 result.append(("ip", self.ip))
5107 instance.nics[0].bridge = self.bridge
5108 result.append(("bridge", self.bridge))
5110 instance.nics[0].mac = self.mac
5111 result.append(("mac", self.mac))
5112 if self.do_kernel_path:
5113 instance.kernel_path = self.kernel_path
5114 result.append(("kernel_path", self.kernel_path))
5115 if self.do_initrd_path:
5116 instance.initrd_path = self.initrd_path
5117 result.append(("initrd_path", self.initrd_path))
5118 if self.hvm_boot_order:
5119 if self.hvm_boot_order == constants.VALUE_DEFAULT:
5120 instance.hvm_boot_order = None
5122 instance.hvm_boot_order = self.hvm_boot_order
5123 result.append(("hvm_boot_order", self.hvm_boot_order))
5124 if self.hvm_acpi is not None:
5125 instance.hvm_acpi = self.hvm_acpi
5126 result.append(("hvm_acpi", self.hvm_acpi))
5127 if self.hvm_pae is not None:
5128 instance.hvm_pae = self.hvm_pae
5129 result.append(("hvm_pae", self.hvm_pae))
5130 if self.hvm_cdrom_image_path:
5131 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
5132 instance.hvm_cdrom_image_path = None
5134 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
5135 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
5136 if self.vnc_bind_address:
5137 instance.vnc_bind_address = self.vnc_bind_address
5138 result.append(("vnc_bind_address", self.vnc_bind_address))
5140 self.cfg.AddInstance(instance)
5145 class LUQueryExports(NoHooksLU):
5146 """Query the exports list
5151 def CheckPrereq(self):
5152 """Check that the nodelist contains only existing nodes.
5155 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5157 def Exec(self, feedback_fn):
5158 """Compute the list of all the exported system images.
5161 a dictionary with the structure node->(export-list)
5162 where export-list is a list of the instances exported on
5166 return rpc.call_export_list(self.nodes)
5169 class LUExportInstance(LogicalUnit):
5170 """Export an instance to an image in the cluster.
5173 HPATH = "instance-export"
5174 HTYPE = constants.HTYPE_INSTANCE
5175 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5177 def BuildHooksEnv(self):
5180 This will run on the master, primary node and target node.
5184 "EXPORT_NODE": self.op.target_node,
5185 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5187 env.update(_BuildInstanceHookEnvByObject(self.instance))
5188 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5189 self.op.target_node]
5192 def CheckPrereq(self):
5193 """Check prerequisites.
5195 This checks that the instance and node names are valid.
5198 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5199 self.instance = self.cfg.GetInstanceInfo(instance_name)
5200 if self.instance is None:
5201 raise errors.OpPrereqError("Instance '%s' not found" %
5202 self.op.instance_name)
5205 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5206 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5208 if self.dst_node is None:
5209 raise errors.OpPrereqError("Destination node '%s' is unknown." %
5210 self.op.target_node)
5211 self.op.target_node = self.dst_node.name
5213 def Exec(self, feedback_fn):
5214 """Export an instance to an image in the cluster.
5217 instance = self.instance
5218 dst_node = self.dst_node
5219 src_node = instance.primary_node
5220 if self.op.shutdown:
5221 # shutdown the instance, but not the disks
5222 if not rpc.call_instance_shutdown(src_node, instance):
5223 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5224 (instance.name, src_node))
5226 vgname = self.cfg.GetVGName()
5231 for disk in instance.disks:
5232 if disk.iv_name == "sda":
5233 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5234 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5236 if not new_dev_name:
5237 logger.Error("could not snapshot block device %s on node %s" %
5238 (disk.logical_id[1], src_node))
5240 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5241 logical_id=(vgname, new_dev_name),
5242 physical_id=(vgname, new_dev_name),
5243 iv_name=disk.iv_name)
5244 snap_disks.append(new_dev)
5247 if self.op.shutdown and instance.status == "up":
5248 if not rpc.call_instance_start(src_node, instance, None):
5249 _ShutdownInstanceDisks(instance, self.cfg)
5250 raise errors.OpExecError("Could not start instance")
5252 # TODO: check for size
5254 for dev in snap_disks:
5255 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5257 logger.Error("could not export block device %s from node"
5259 (dev.logical_id[1], src_node, dst_node.name))
5260 if not rpc.call_blockdev_remove(src_node, dev):
5261 logger.Error("could not remove snapshot block device %s from"
5262 " node %s" % (dev.logical_id[1], src_node))
5264 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5265 logger.Error("could not finalize export for instance %s on node %s" %
5266 (instance.name, dst_node.name))
5268 nodelist = self.cfg.GetNodeList()
5269 nodelist.remove(dst_node.name)
5271 # on one-node clusters nodelist will be empty after the removal
5272 # if we proceed the backup would be removed because OpQueryExports
5273 # substitutes an empty list with the full cluster node list.
5275 op = opcodes.OpQueryExports(nodes=nodelist)
5276 exportlist = self.proc.ChainOpCode(op)
5277 for node in exportlist:
5278 if instance.name in exportlist[node]:
5279 if not rpc.call_export_remove(node, instance.name):
5280 logger.Error("could not remove older export for instance %s"
5281 " on node %s" % (instance.name, node))
5284 class LURemoveExport(NoHooksLU):
5285 """Remove exports related to the named instance.
5288 _OP_REQP = ["instance_name"]
5290 def CheckPrereq(self):
5291 """Check prerequisites.
5295 def Exec(self, feedback_fn):
5296 """Remove any export.
5299 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5300 # If the instance was not found we'll try with the name that was passed in.
5301 # This will only work if it was an FQDN, though.
5303 if not instance_name:
5305 instance_name = self.op.instance_name
5307 op = opcodes.OpQueryExports(nodes=[])
5308 exportlist = self.proc.ChainOpCode(op)
5310 for node in exportlist:
5311 if instance_name in exportlist[node]:
5313 if not rpc.call_export_remove(node, instance_name):
5314 logger.Error("could not remove export for instance %s"
5315 " on node %s" % (instance_name, node))
5317 if fqdn_warn and not found:
5318 feedback_fn("Export not found. If trying to remove an export belonging"
5319 " to a deleted instance please use its Fully Qualified"
5323 class TagsLU(NoHooksLU):
5326 This is an abstract class which is the parent of all the other tags LUs.
5329 def CheckPrereq(self):
5330 """Check prerequisites.
5333 if self.op.kind == constants.TAG_CLUSTER:
5334 self.target = self.cfg.GetClusterInfo()
5335 elif self.op.kind == constants.TAG_NODE:
5336 name = self.cfg.ExpandNodeName(self.op.name)
5338 raise errors.OpPrereqError("Invalid node name (%s)" %
5341 self.target = self.cfg.GetNodeInfo(name)
5342 elif self.op.kind == constants.TAG_INSTANCE:
5343 name = self.cfg.ExpandInstanceName(self.op.name)
5345 raise errors.OpPrereqError("Invalid instance name (%s)" %
5348 self.target = self.cfg.GetInstanceInfo(name)
5350 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5354 class LUGetTags(TagsLU):
5355 """Returns the tags of a given object.
5358 _OP_REQP = ["kind", "name"]
5360 def Exec(self, feedback_fn):
5361 """Returns the tag list.
5364 return self.target.GetTags()
5367 class LUSearchTags(NoHooksLU):
5368 """Searches the tags for a given pattern.
5371 _OP_REQP = ["pattern"]
5373 def CheckPrereq(self):
5374 """Check prerequisites.
5376 This checks the pattern passed for validity by compiling it.
5380 self.re = re.compile(self.op.pattern)
5381 except re.error, err:
5382 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5383 (self.op.pattern, err))
5385 def Exec(self, feedback_fn):
5386 """Returns the tag list.
5390 tgts = [("/cluster", cfg.GetClusterInfo())]
5391 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5392 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5393 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5394 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5396 for path, target in tgts:
5397 for tag in target.GetTags():
5398 if self.re.search(tag):
5399 results.append((path, tag))
5403 class LUAddTags(TagsLU):
5404 """Sets a tag on a given object.
5407 _OP_REQP = ["kind", "name", "tags"]
5409 def CheckPrereq(self):
5410 """Check prerequisites.
5412 This checks the type and length of the tag name and value.
5415 TagsLU.CheckPrereq(self)
5416 for tag in self.op.tags:
5417 objects.TaggableObject.ValidateTag(tag)
5419 def Exec(self, feedback_fn):
5424 for tag in self.op.tags:
5425 self.target.AddTag(tag)
5426 except errors.TagError, err:
5427 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5429 self.cfg.Update(self.target)
5430 except errors.ConfigurationError:
5431 raise errors.OpRetryError("There has been a modification to the"
5432 " config file and the operation has been"
5433 " aborted. Please retry.")
5436 class LUDelTags(TagsLU):
5437 """Delete a list of tags from a given object.
5440 _OP_REQP = ["kind", "name", "tags"]
5442 def CheckPrereq(self):
5443 """Check prerequisites.
5445 This checks that we have the given tag.
5448 TagsLU.CheckPrereq(self)
5449 for tag in self.op.tags:
5450 objects.TaggableObject.ValidateTag(tag, removal=True)
5451 del_tags = frozenset(self.op.tags)
5452 cur_tags = self.target.GetTags()
5453 if not del_tags <= cur_tags:
5454 diff_tags = del_tags - cur_tags
5455 diff_names = ["'%s'" % tag for tag in diff_tags]
5457 raise errors.OpPrereqError("Tag(s) %s not found" %
5458 (",".join(diff_names)))
5460 def Exec(self, feedback_fn):
5461 """Remove the tag from the object.
5464 for tag in self.op.tags:
5465 self.target.RemoveTag(tag)
5467 self.cfg.Update(self.target)
5468 except errors.ConfigurationError:
5469 raise errors.OpRetryError("There has been a modification to the"
5470 " config file and the operation has been"
5471 " aborted. Please retry.")
5473 class LUTestDelay(NoHooksLU):
5474 """Sleep for a specified amount of time.
5476 This LU sleeps on the master and/or nodes for a specified amoutn of
5480 _OP_REQP = ["duration", "on_master", "on_nodes"]
5482 def CheckPrereq(self):
5483 """Check prerequisites.
5485 This checks that we have a good list of nodes and/or the duration
5490 if self.op.on_nodes:
5491 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5493 def Exec(self, feedback_fn):
5494 """Do the actual sleep.
5497 if self.op.on_master:
5498 if not utils.TestDelay(self.op.duration):
5499 raise errors.OpExecError("Error during master delay test")
5500 if self.op.on_nodes:
5501 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5503 raise errors.OpExecError("Complete failure from rpc call")
5504 for node, node_result in result.items():
5506 raise errors.OpExecError("Failure during rpc call to node %s,"
5507 " result: %s" % (node, node_result))
5510 class IAllocator(object):
5511 """IAllocator framework.
5513 An IAllocator instance has three sets of attributes:
5514 - cfg/sstore that are needed to query the cluster
5515 - input data (all members of the _KEYS class attribute are required)
5516 - four buffer attributes (in|out_data|text), that represent the
5517 input (to the external script) in text and data structure format,
5518 and the output from it, again in two formats
5519 - the result variables from the script (success, info, nodes) for
5524 "mem_size", "disks", "disk_template",
5525 "os", "tags", "nics", "vcpus",
5531 def __init__(self, cfg, sstore, mode, name, **kwargs):
5533 self.sstore = sstore
5534 # init buffer variables
5535 self.in_text = self.out_text = self.in_data = self.out_data = None
5536 # init all input fields so that pylint is happy
5539 self.mem_size = self.disks = self.disk_template = None
5540 self.os = self.tags = self.nics = self.vcpus = None
5541 self.relocate_from = None
5543 self.required_nodes = None
5544 # init result fields
5545 self.success = self.info = self.nodes = None
5546 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5547 keyset = self._ALLO_KEYS
5548 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5549 keyset = self._RELO_KEYS
5551 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5552 " IAllocator" % self.mode)
5554 if key not in keyset:
5555 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5556 " IAllocator" % key)
5557 setattr(self, key, kwargs[key])
5559 if key not in kwargs:
5560 raise errors.ProgrammerError("Missing input parameter '%s' to"
5561 " IAllocator" % key)
5562 self._BuildInputData()
5564 def _ComputeClusterData(self):
5565 """Compute the generic allocator input data.
5567 This is the data that is independent of the actual operation.
5574 "cluster_name": self.sstore.GetClusterName(),
5575 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5576 "hypervisor_type": self.sstore.GetHypervisorType(),
5577 # we don't have job IDs
5580 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5584 node_list = cfg.GetNodeList()
5585 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5586 for nname in node_list:
5587 ninfo = cfg.GetNodeInfo(nname)
5588 if nname not in node_data or not isinstance(node_data[nname], dict):
5589 raise errors.OpExecError("Can't get data for node %s" % nname)
5590 remote_info = node_data[nname]
5591 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5592 'vg_size', 'vg_free', 'cpu_total']:
5593 if attr not in remote_info:
5594 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5597 remote_info[attr] = int(remote_info[attr])
5598 except ValueError, err:
5599 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5600 " %s" % (nname, attr, str(err)))
5601 # compute memory used by primary instances
5602 i_p_mem = i_p_up_mem = 0
5603 for iinfo in i_list:
5604 if iinfo.primary_node == nname:
5605 i_p_mem += iinfo.memory
5606 if iinfo.status == "up":
5607 i_p_up_mem += iinfo.memory
5609 # compute memory used by instances
5611 "tags": list(ninfo.GetTags()),
5612 "total_memory": remote_info['memory_total'],
5613 "reserved_memory": remote_info['memory_dom0'],
5614 "free_memory": remote_info['memory_free'],
5615 "i_pri_memory": i_p_mem,
5616 "i_pri_up_memory": i_p_up_mem,
5617 "total_disk": remote_info['vg_size'],
5618 "free_disk": remote_info['vg_free'],
5619 "primary_ip": ninfo.primary_ip,
5620 "secondary_ip": ninfo.secondary_ip,
5621 "total_cpus": remote_info['cpu_total'],
5623 node_results[nname] = pnr
5624 data["nodes"] = node_results
5628 for iinfo in i_list:
5629 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5630 for n in iinfo.nics]
5632 "tags": list(iinfo.GetTags()),
5633 "should_run": iinfo.status == "up",
5634 "vcpus": iinfo.vcpus,
5635 "memory": iinfo.memory,
5637 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5639 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5640 "disk_template": iinfo.disk_template,
5642 instance_data[iinfo.name] = pir
5644 data["instances"] = instance_data
5648 def _AddNewInstance(self):
5649 """Add new instance data to allocator structure.
5651 This in combination with _AllocatorGetClusterData will create the
5652 correct structure needed as input for the allocator.
5654 The checks for the completeness of the opcode must have already been
5659 if len(self.disks) != 2:
5660 raise errors.OpExecError("Only two-disk configurations supported")
5662 disk_space = _ComputeDiskSize(self.disk_template,
5663 self.disks[0]["size"], self.disks[1]["size"])
5665 if self.disk_template in constants.DTS_NET_MIRROR:
5666 self.required_nodes = 2
5668 self.required_nodes = 1
5672 "disk_template": self.disk_template,
5675 "vcpus": self.vcpus,
5676 "memory": self.mem_size,
5677 "disks": self.disks,
5678 "disk_space_total": disk_space,
5680 "required_nodes": self.required_nodes,
5682 data["request"] = request
5684 def _AddRelocateInstance(self):
5685 """Add relocate instance data to allocator structure.
5687 This in combination with _IAllocatorGetClusterData will create the
5688 correct structure needed as input for the allocator.
5690 The checks for the completeness of the opcode must have already been
5694 instance = self.cfg.GetInstanceInfo(self.name)
5695 if instance is None:
5696 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5697 " IAllocator" % self.name)
5699 if instance.disk_template not in constants.DTS_NET_MIRROR:
5700 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5702 if len(instance.secondary_nodes) != 1:
5703 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5705 self.required_nodes = 1
5707 disk_space = _ComputeDiskSize(instance.disk_template,
5708 instance.disks[0].size,
5709 instance.disks[1].size)
5714 "disk_space_total": disk_space,
5715 "required_nodes": self.required_nodes,
5716 "relocate_from": self.relocate_from,
5718 self.in_data["request"] = request
5720 def _BuildInputData(self):
5721 """Build input data structures.
5724 self._ComputeClusterData()
5726 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5727 self._AddNewInstance()
5729 self._AddRelocateInstance()
5731 self.in_text = serializer.Dump(self.in_data)
5733 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5734 """Run an instance allocator and return the results.
5739 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5741 if not isinstance(result, tuple) or len(result) != 4:
5742 raise errors.OpExecError("Invalid result from master iallocator runner")
5744 rcode, stdout, stderr, fail = result
5746 if rcode == constants.IARUN_NOTFOUND:
5747 raise errors.OpExecError("Can't find allocator '%s'" % name)
5748 elif rcode == constants.IARUN_FAILURE:
5749 raise errors.OpExecError("Instance allocator call failed: %s,"
5751 (fail, stdout+stderr))
5752 self.out_text = stdout
5754 self._ValidateResult()
5756 def _ValidateResult(self):
5757 """Process the allocator results.
5759 This will process and if successful save the result in
5760 self.out_data and the other parameters.
5764 rdict = serializer.Load(self.out_text)
5765 except Exception, err:
5766 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5768 if not isinstance(rdict, dict):
5769 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5771 for key in "success", "info", "nodes":
5772 if key not in rdict:
5773 raise errors.OpExecError("Can't parse iallocator results:"
5774 " missing key '%s'" % key)
5775 setattr(self, key, rdict[key])
5777 if not isinstance(rdict["nodes"], list):
5778 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5780 self.out_data = rdict
5783 class LUTestAllocator(NoHooksLU):
5784 """Run allocator tests.
5786 This LU runs the allocator tests
5789 _OP_REQP = ["direction", "mode", "name"]
5791 def CheckPrereq(self):
5792 """Check prerequisites.
5794 This checks the opcode parameters depending on the director and mode test.
5797 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5798 for attr in ["name", "mem_size", "disks", "disk_template",
5799 "os", "tags", "nics", "vcpus"]:
5800 if not hasattr(self.op, attr):
5801 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5803 iname = self.cfg.ExpandInstanceName(self.op.name)
5804 if iname is not None:
5805 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5807 if not isinstance(self.op.nics, list):
5808 raise errors.OpPrereqError("Invalid parameter 'nics'")
5809 for row in self.op.nics:
5810 if (not isinstance(row, dict) or
5813 "bridge" not in row):
5814 raise errors.OpPrereqError("Invalid contents of the"
5815 " 'nics' parameter")
5816 if not isinstance(self.op.disks, list):
5817 raise errors.OpPrereqError("Invalid parameter 'disks'")
5818 if len(self.op.disks) != 2:
5819 raise errors.OpPrereqError("Only two-disk configurations supported")
5820 for row in self.op.disks:
5821 if (not isinstance(row, dict) or
5822 "size" not in row or
5823 not isinstance(row["size"], int) or
5824 "mode" not in row or
5825 row["mode"] not in ['r', 'w']):
5826 raise errors.OpPrereqError("Invalid contents of the"
5827 " 'disks' parameter")
5828 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5829 if not hasattr(self.op, "name"):
5830 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5831 fname = self.cfg.ExpandInstanceName(self.op.name)
5833 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5835 self.op.name = fname
5836 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5838 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5841 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5842 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5843 raise errors.OpPrereqError("Missing allocator name")
5844 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5845 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5848 def Exec(self, feedback_fn):
5849 """Run the allocator test.
5852 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5853 ial = IAllocator(self.cfg, self.sstore,
5856 mem_size=self.op.mem_size,
5857 disks=self.op.disks,
5858 disk_template=self.op.disk_template,
5862 vcpus=self.op.vcpus,
5865 ial = IAllocator(self.cfg, self.sstore,
5868 relocate_from=list(self.relocate_from),
5871 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5872 result = ial.in_text
5874 ial.Run(self.op.allocator, validate=False)
5875 result = ial.out_text