4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 No nodes should be returned as an empty list (and not None).
138 Note that if the HPATH for a LU class is None, this function will
142 raise NotImplementedError
144 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145 """Notify the LU about the results of its hooks.
147 This method is called every time a hooks phase is executed, and notifies
148 the Logical Unit about the hooks' result. The LU can then use it to alter
149 its result based on the hooks. By default the method does nothing and the
150 previous result is passed back unchanged but any LU can define it if it
151 wants to use the local cluster hook-scripts somehow.
154 phase: the hooks phase that has just been run
155 hooks_results: the results of the multi-node hooks rpc call
156 feedback_fn: function to send feedback back to the caller
157 lu_result: the previous result this LU had, or None in the PRE phase.
163 class NoHooksLU(LogicalUnit):
164 """Simple LU which runs no hooks.
166 This LU is intended as a parent for other LogicalUnits which will
167 run no hooks, in order to reduce duplicate code.
174 def _AddHostToEtcHosts(hostname):
175 """Wrapper around utils.SetEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182 def _RemoveHostFromEtcHosts(hostname):
183 """Wrapper around utils.RemoveEtcHostsEntry.
186 hi = utils.HostInfo(name=hostname)
187 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191 def _GetWantedNodes(lu, nodes):
192 """Returns list of checked and expanded node names.
195 nodes: List of nodes (strings) or None for all
198 if not isinstance(nodes, list):
199 raise errors.OpPrereqError("Invalid argument type 'nodes'")
205 node = lu.cfg.ExpandNodeName(name)
207 raise errors.OpPrereqError("No such node name '%s'" % name)
211 wanted = lu.cfg.GetNodeList()
212 return utils.NiceSort(wanted)
215 def _GetWantedInstances(lu, instances):
216 """Returns list of checked and expanded instance names.
219 instances: List of instances (strings) or None for all
222 if not isinstance(instances, list):
223 raise errors.OpPrereqError("Invalid argument type 'instances'")
228 for name in instances:
229 instance = lu.cfg.ExpandInstanceName(name)
231 raise errors.OpPrereqError("No such instance name '%s'" % name)
232 wanted.append(instance)
235 wanted = lu.cfg.GetInstanceList()
236 return utils.NiceSort(wanted)
239 def _CheckOutputFields(static, dynamic, selected):
240 """Checks whether all selected fields are valid.
243 static: Static fields
244 dynamic: Dynamic fields
247 static_fields = frozenset(static)
248 dynamic_fields = frozenset(dynamic)
250 all_fields = static_fields | dynamic_fields
252 if not all_fields.issuperset(selected):
253 raise errors.OpPrereqError("Unknown output fields selected: %s"
254 % ",".join(frozenset(selected).
255 difference(all_fields)))
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259 memory, vcpus, nics):
260 """Builds instance related env variables for hooks from single variables.
263 secondary_nodes: List of secondary nodes as strings
267 "INSTANCE_NAME": name,
268 "INSTANCE_PRIMARY": primary_node,
269 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270 "INSTANCE_OS_TYPE": os_type,
271 "INSTANCE_STATUS": status,
272 "INSTANCE_MEMORY": memory,
273 "INSTANCE_VCPUS": vcpus,
277 nic_count = len(nics)
278 for idx, (ip, bridge, mac) in enumerate(nics):
281 env["INSTANCE_NIC%d_IP" % idx] = ip
282 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
287 env["INSTANCE_NIC_COUNT"] = nic_count
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293 """Builds instance related env variables for hooks from an object.
296 instance: objects.Instance object of instance
297 override: dict of values to override
300 'name': instance.name,
301 'primary_node': instance.primary_node,
302 'secondary_nodes': instance.secondary_nodes,
303 'os_type': instance.os,
304 'status': instance.os,
305 'memory': instance.memory,
306 'vcpus': instance.vcpus,
307 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310 args.update(override)
311 return _BuildInstanceHookEnv(**args)
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315 """Ensure a node has a correct known_hosts entry.
318 fullnode - Fully qualified domain name of host. (str)
319 ip - IPv4 address of host (str)
320 pubkey - the public key of the cluster
323 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
335 logger.Debug('read %s' % (repr(rawline),))
337 parts = rawline.rstrip('\r\n').split()
339 # Ignore unwanted lines
340 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341 fields = parts[0].split(',')
346 for spec in [ ip, fullnode ]:
347 if spec not in fields:
352 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353 if haveall and key == pubkey:
355 save_lines.append(rawline)
356 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359 if havesome and (not haveall or key != pubkey):
361 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364 save_lines.append(rawline)
367 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371 save_lines = save_lines + add_lines
373 # Write a new file and replace old.
374 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
376 newfile = os.fdopen(fd, 'w')
378 newfile.write(''.join(save_lines))
381 logger.Debug("Wrote new known_hosts.")
382 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385 # Simply appending a new line will do the trick.
387 for add in add_lines:
393 def _HasValidVG(vglist, vgname):
394 """Checks if the volume group list is valid.
396 A non-None return value means there's an error, and the return value
397 is the error message.
400 vgsize = vglist.get(vgname, None)
402 return "volume group '%s' missing" % vgname
404 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
409 def _InitSSHSetup(node):
410 """Setup the SSH configuration for the cluster.
413 This generates a dsa keypair for root, adds the pub key to the
414 permitted hosts and adds the hostkey to its own known hosts.
417 node: the name of this host as a fqdn
420 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
422 for name in priv_key, pub_key:
423 if os.path.exists(name):
424 utils.CreateBackup(name)
425 utils.RemoveFile(name)
427 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
431 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434 f = open(pub_key, 'r')
436 utils.AddAuthorizedKey(auth_keys, f.read(8192))
441 def _InitGanetiServerSetup(ss):
442 """Setup the necessary configuration for the initial node daemon.
444 This creates the nodepass file containing the shared password for
445 the cluster and also generates the SSL certificate.
448 # Create pseudo random password
449 randpass = sha.new(os.urandom(64)).hexdigest()
450 # and write it into sstore
451 ss.SetKey(ss.SS_NODED_PASS, randpass)
453 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454 "-days", str(365*5), "-nodes", "-x509",
455 "-keyout", constants.SSL_CERT_FILE,
456 "-out", constants.SSL_CERT_FILE, "-batch"])
458 raise errors.OpExecError("could not generate server ssl cert, command"
459 " %s had exitcode %s and error message %s" %
460 (result.cmd, result.exit_code, result.output))
462 os.chmod(constants.SSL_CERT_FILE, 0400)
464 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467 raise errors.OpExecError("Could not start the node daemon, command %s"
468 " had exitcode %s and error %s" %
469 (result.cmd, result.exit_code, result.output))
472 def _CheckInstanceBridgesExist(instance):
473 """Check that the brigdes needed by an instance exist.
476 # check bridges existance
477 brlist = [nic.bridge for nic in instance.nics]
478 if not rpc.call_bridges_exist(instance.primary_node, brlist):
479 raise errors.OpPrereqError("one or more target bridges %s does not"
480 " exist on destination node '%s'" %
481 (brlist, instance.primary_node))
484 class LUInitCluster(LogicalUnit):
485 """Initialise the cluster.
488 HPATH = "cluster-init"
489 HTYPE = constants.HTYPE_CLUSTER
490 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491 "def_bridge", "master_netdev"]
494 def BuildHooksEnv(self):
497 Notes: Since we don't require a cluster, we must manually add
498 ourselves in the post-run node list.
501 env = {"OP_TARGET": self.op.cluster_name}
502 return env, [], [self.hostname.name]
504 def CheckPrereq(self):
505 """Verify that the passed name is a valid one.
508 if config.ConfigWriter.IsCluster():
509 raise errors.OpPrereqError("Cluster is already initialised")
511 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512 if not os.path.exists(constants.VNC_PASSWORD_FILE):
513 raise errors.OpPrereqError("Please prepare the cluster VNC"
515 constants.VNC_PASSWORD_FILE)
517 self.hostname = hostname = utils.HostInfo()
519 if hostname.ip.startswith("127."):
520 raise errors.OpPrereqError("This host's IP resolves to the private"
521 " range (%s). Please fix DNS or %s." %
522 (hostname.ip, constants.ETC_HOSTS))
524 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525 source=constants.LOCALHOST_IP_ADDRESS):
526 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527 " to %s,\nbut this ip address does not"
528 " belong to this host."
529 " Aborting." % hostname.ip)
531 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
533 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
535 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
537 secondary_ip = getattr(self.op, "secondary_ip", None)
538 if secondary_ip and not utils.IsValidIP(secondary_ip):
539 raise errors.OpPrereqError("Invalid secondary ip given")
541 secondary_ip != hostname.ip and
542 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543 source=constants.LOCALHOST_IP_ADDRESS))):
544 raise errors.OpPrereqError("You gave %s as secondary IP,"
545 " but it does not belong to this host." %
547 self.secondary_ip = secondary_ip
549 # checks presence of the volume group given
550 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553 raise errors.OpPrereqError("Error: %s" % vgstatus)
555 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
557 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560 if self.op.hypervisor_type not in constants.HYPER_TYPES:
561 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562 self.op.hypervisor_type)
564 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
566 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567 (self.op.master_netdev,
568 result.output.strip()))
570 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572 raise errors.OpPrereqError("Init.d script '%s' missing or not"
573 " executable." % constants.NODE_INITD_SCRIPT)
575 def Exec(self, feedback_fn):
576 """Initialize the cluster.
579 clustername = self.clustername
580 hostname = self.hostname
582 # set up the simple store
583 self.sstore = ss = ssconf.SimpleStore()
584 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
590 # set up the inter-node password and certificate
591 _InitGanetiServerSetup(ss)
593 # start the master ip
594 rpc.call_node_start_master(hostname.name)
596 # set up ssh config and /etc/hosts
597 f = open(constants.SSH_HOST_RSA_PUB, 'r')
602 sshkey = sshline.split(" ")[1]
604 _AddHostToEtcHosts(hostname.name)
606 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
608 _InitSSHSetup(hostname.name)
610 # init of cluster config file
611 self.cfg = cfgw = config.ConfigWriter()
612 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613 sshkey, self.op.mac_prefix,
614 self.op.vg_name, self.op.def_bridge)
617 class LUDestroyCluster(NoHooksLU):
618 """Logical unit for destroying the cluster.
623 def CheckPrereq(self):
624 """Check prerequisites.
626 This checks whether the cluster is empty.
628 Any errors are signalled by raising errors.OpPrereqError.
631 master = self.sstore.GetMasterNode()
633 nodelist = self.cfg.GetNodeList()
634 if len(nodelist) != 1 or nodelist[0] != master:
635 raise errors.OpPrereqError("There are still %d node(s) in"
636 " this cluster." % (len(nodelist) - 1))
637 instancelist = self.cfg.GetInstanceList()
639 raise errors.OpPrereqError("There are still %d instance(s) in"
640 " this cluster." % len(instancelist))
642 def Exec(self, feedback_fn):
643 """Destroys the cluster.
646 master = self.sstore.GetMasterNode()
647 if not rpc.call_node_stop_master(master):
648 raise errors.OpExecError("Could not disable the master role")
649 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650 utils.CreateBackup(priv_key)
651 utils.CreateBackup(pub_key)
652 rpc.call_node_leave_cluster(master)
655 class LUVerifyCluster(LogicalUnit):
656 """Verifies the cluster status.
659 HPATH = "cluster-verify"
660 HTYPE = constants.HTYPE_CLUSTER
661 _OP_REQP = ["skip_checks"]
663 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664 remote_version, feedback_fn):
665 """Run multiple tests against a node.
668 - compares ganeti version
669 - checks vg existance and size > 20G
670 - checks config file checksum
671 - checks ssh to other nodes
674 node: name of the node to check
675 file_list: required list of files
676 local_cksum: dictionary of local files and their checksums
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 if not remote_version:
682 feedback_fn(" - ERROR: connection to %s failed" % (node))
685 if local_version != remote_version:
686 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
687 (local_version, node, remote_version))
690 # checks vg existance and size > 20G
694 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
698 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
700 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
703 # checks config file checksum
706 if 'filelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned file checksum data")
710 remote_cksum = node_result['filelist']
711 for file_name in file_list:
712 if file_name not in remote_cksum:
714 feedback_fn(" - ERROR: file '%s' missing" % file_name)
715 elif remote_cksum[file_name] != local_cksum[file_name]:
717 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
719 if 'nodelist' not in node_result:
721 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
723 if node_result['nodelist']:
725 for node in node_result['nodelist']:
726 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
727 (node, node_result['nodelist'][node]))
728 if 'node-net-test' not in node_result:
730 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
732 if node_result['node-net-test']:
734 nlist = utils.NiceSort(node_result['node-net-test'].keys())
736 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
737 (node, node_result['node-net-test'][node]))
739 hyp_result = node_result.get('hypervisor', None)
740 if hyp_result is not None:
741 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
744 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745 node_instance, feedback_fn):
746 """Verify an instance.
748 This function checks to see if the required block devices are
749 available on the instance's node.
754 node_current = instanceconfig.primary_node
757 instanceconfig.MapLVsByNode(node_vol_should)
759 for node in node_vol_should:
760 for volume in node_vol_should[node]:
761 if node not in node_vol_is or volume not in node_vol_is[node]:
762 feedback_fn(" - ERROR: volume %s missing on node %s" %
766 if not instanceconfig.status == 'down':
767 if (node_current not in node_instance or
768 not instance in node_instance[node_current]):
769 feedback_fn(" - ERROR: instance %s not running on node %s" %
770 (instance, node_current))
773 for node in node_instance:
774 if (not node == node_current):
775 if instance in node_instance[node]:
776 feedback_fn(" - ERROR: instance %s should not run on node %s" %
782 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783 """Verify if there are any unknown volumes in the cluster.
785 The .os, .swap and backup volumes are ignored. All other volumes are
791 for node in node_vol_is:
792 for volume in node_vol_is[node]:
793 if node not in node_vol_should or volume not in node_vol_should[node]:
794 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
799 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800 """Verify the list of running instances.
802 This checks what instances are running but unknown to the cluster.
806 for node in node_instance:
807 for runninginstance in node_instance[node]:
808 if runninginstance not in instancelist:
809 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
810 (runninginstance, node))
814 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815 """Verify N+1 Memory Resilience.
817 Check that if one single node dies we can still start all the instances it
823 for node, nodeinfo in node_info.iteritems():
824 # This code checks that every node which is now listed as secondary has
825 # enough memory to host all instances it is supposed to should a single
826 # other node in the cluster fail.
827 # FIXME: not ready for failover to an arbitrary node
828 # FIXME: does not support file-backed instances
829 # WARNING: we currently take into account down instances as well as up
830 # ones, considering that even if they're down someone might want to start
831 # them even in the event of a node failure.
832 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
834 for instance in instances:
835 needed_mem += instance_cfg[instance].memory
836 if nodeinfo['mfree'] < needed_mem:
837 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
838 " failovers should node %s fail" % (node, prinode))
842 def CheckPrereq(self):
843 """Check prerequisites.
845 Transform the list of checks we're going to skip into a set and check that
846 all its members are valid.
849 self.skip_set = frozenset(self.op.skip_checks)
850 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
851 raise errors.OpPrereqError("Invalid checks to be skipped specified")
853 def BuildHooksEnv(self):
856 Cluster-Verify hooks just rone in the post phase and their failure makes
857 the output be logged in the verify output and the verification to fail.
860 all_nodes = self.cfg.GetNodeList()
861 tags = self.cfg.GetClusterInfo().GetTags()
862 # TODO: populate the environment with useful information for verify hooks
864 "CLUSTER_TAGS": " ".join(tags),
866 return env, [], all_nodes
868 def Exec(self, feedback_fn):
869 """Verify integrity of cluster, performing various test on nodes.
873 feedback_fn("* Verifying global settings")
874 for msg in self.cfg.VerifyConfig():
875 feedback_fn(" - ERROR: %s" % msg)
877 vg_name = self.cfg.GetVGName()
878 nodelist = utils.NiceSort(self.cfg.GetNodeList())
879 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
880 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
881 i_non_redundant = [] # Non redundant instances
887 # FIXME: verify OS list
889 file_names = list(self.sstore.GetFileList())
890 file_names.append(constants.SSL_CERT_FILE)
891 file_names.append(constants.CLUSTER_CONF_FILE)
892 local_checksums = utils.FingerprintFiles(file_names)
894 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
895 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
896 all_instanceinfo = rpc.call_instance_list(nodelist)
897 all_vglist = rpc.call_vg_list(nodelist)
898 node_verify_param = {
899 'filelist': file_names,
900 'nodelist': nodelist,
902 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
903 for node in nodeinfo]
905 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
906 all_rversion = rpc.call_version(nodelist)
907 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
909 incomplete_nodeinfo = False
911 for node in nodelist:
912 feedback_fn("* Verifying node %s" % node)
913 result = self._VerifyNode(node, file_names, local_checksums,
914 all_vglist[node], all_nvinfo[node],
915 all_rversion[node], feedback_fn)
919 volumeinfo = all_volumeinfo[node]
921 if isinstance(volumeinfo, basestring):
922 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
923 (node, volumeinfo[-400:].encode('string_escape')))
925 node_volume[node] = {}
926 elif not isinstance(volumeinfo, dict):
927 feedback_fn(" - ERROR: connection to %s failed" % (node,))
929 incomplete_nodeinfo = True
932 node_volume[node] = volumeinfo
935 nodeinstance = all_instanceinfo[node]
936 if type(nodeinstance) != list:
937 feedback_fn(" - ERROR: connection to %s failed" % (node,))
939 incomplete_nodeinfo = True
942 node_instance[node] = nodeinstance
945 nodeinfo = all_ninfo[node]
946 if not isinstance(nodeinfo, dict):
947 feedback_fn(" - ERROR: connection to %s failed" % (node,))
949 incomplete_nodeinfo = True
954 "mfree": int(nodeinfo['memory_free']),
955 "dfree": int(nodeinfo['vg_free']),
958 # dictionary holding all instances this node is secondary for,
959 # grouped by their primary node. Each key is a cluster node, and each
960 # value is a list of instances which have the key as primary and the
961 # current node as secondary. this is handy to calculate N+1 memory
962 # availability if you can only failover from a primary to its
964 "sinst-by-pnode": {},
966 except (ValueError, TypeError):
967 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
969 incomplete_nodeinfo = True
974 for instance in instancelist:
975 feedback_fn("* Verifying instance %s" % instance)
976 inst_config = self.cfg.GetInstanceInfo(instance)
977 result = self._VerifyInstance(instance, inst_config, node_volume,
978 node_instance, feedback_fn)
981 inst_config.MapLVsByNode(node_vol_should)
983 instance_cfg[instance] = inst_config
985 pnode = inst_config.primary_node
986 if pnode in node_info:
987 node_info[pnode]['pinst'].append(instance)
989 feedback_fn(" - ERROR: instance %s, connection to primary node"
990 " %s failed" % (instance, pnode))
993 # If the instance is non-redundant we cannot survive losing its primary
994 # node, so we are not N+1 compliant. On the other hand we have no disk
995 # templates with more than one secondary so that situation is not well
997 # FIXME: does not support file-backed instances
998 if len(inst_config.secondary_nodes) == 0:
999 i_non_redundant.append(instance)
1000 elif len(inst_config.secondary_nodes) > 1:
1001 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1004 for snode in inst_config.secondary_nodes:
1005 if snode in node_info:
1006 node_info[snode]['sinst'].append(instance)
1007 if pnode not in node_info[snode]['sinst-by-pnode']:
1008 node_info[snode]['sinst-by-pnode'][pnode] = []
1009 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1011 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1012 " %s failed" % (instance, snode))
1014 feedback_fn("* Verifying orphan volumes")
1015 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1019 feedback_fn("* Verifying remaining instances")
1020 result = self._VerifyOrphanInstances(instancelist, node_instance,
1024 if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1025 not incomplete_nodeinfo):
1026 feedback_fn("* Verifying N+1 Memory redundancy")
1027 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1030 feedback_fn("* Other Notes")
1032 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1033 % len(i_non_redundant))
1037 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1038 """Analize the post-hooks' result, handle it, and send some
1039 nicely-formatted feedback back to the user.
1042 phase: the hooks phase that has just been run
1043 hooks_results: the results of the multi-node hooks rpc call
1044 feedback_fn: function to send feedback back to the caller
1045 lu_result: previous Exec result
1048 # We only really run POST phase hooks, and are only interested in their results
1049 if phase == constants.HOOKS_PHASE_POST:
1050 # Used to change hooks' output to proper indentation
1051 indent_re = re.compile('^', re.M)
1052 feedback_fn("* Hooks Results")
1053 if not hooks_results:
1054 feedback_fn(" - ERROR: general communication failure")
1057 for node_name in hooks_results:
1058 show_node_header = True
1059 res = hooks_results[node_name]
1060 if res is False or not isinstance(res, list):
1061 feedback_fn(" Communication failure")
1064 for script, hkr, output in res:
1065 if hkr == constants.HKR_FAIL:
1066 # The node header is only shown once, if there are
1067 # failing hooks on that node
1068 if show_node_header:
1069 feedback_fn(" Node %s:" % node_name)
1070 show_node_header = False
1071 feedback_fn(" ERROR: Script %s failed, output:" % script)
1072 output = indent_re.sub(' ', output)
1073 feedback_fn("%s" % output)
1079 class LUVerifyDisks(NoHooksLU):
1080 """Verifies the cluster disks status.
1085 def CheckPrereq(self):
1086 """Check prerequisites.
1088 This has no prerequisites.
1093 def Exec(self, feedback_fn):
1094 """Verify integrity of cluster disks.
1097 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1099 vg_name = self.cfg.GetVGName()
1100 nodes = utils.NiceSort(self.cfg.GetNodeList())
1101 instances = [self.cfg.GetInstanceInfo(name)
1102 for name in self.cfg.GetInstanceList()]
1105 for inst in instances:
1107 if (inst.status != "up" or
1108 inst.disk_template not in constants.DTS_NET_MIRROR):
1110 inst.MapLVsByNode(inst_lvs)
1111 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1112 for node, vol_list in inst_lvs.iteritems():
1113 for vol in vol_list:
1114 nv_dict[(node, vol)] = inst
1119 node_lvs = rpc.call_volume_list(nodes, vg_name)
1124 lvs = node_lvs[node]
1126 if isinstance(lvs, basestring):
1127 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1128 res_nlvm[node] = lvs
1129 elif not isinstance(lvs, dict):
1130 logger.Info("connection to node %s failed or invalid data returned" %
1132 res_nodes.append(node)
1135 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1136 inst = nv_dict.pop((node, lv_name), None)
1137 if (not lv_online and inst is not None
1138 and inst.name not in res_instances):
1139 res_instances.append(inst.name)
1141 # any leftover items in nv_dict are missing LVs, let's arrange the
1143 for key, inst in nv_dict.iteritems():
1144 if inst.name not in res_missing:
1145 res_missing[inst.name] = []
1146 res_missing[inst.name].append(key)
1151 class LURenameCluster(LogicalUnit):
1152 """Rename the cluster.
1155 HPATH = "cluster-rename"
1156 HTYPE = constants.HTYPE_CLUSTER
1159 def BuildHooksEnv(self):
1164 "OP_TARGET": self.sstore.GetClusterName(),
1165 "NEW_NAME": self.op.name,
1167 mn = self.sstore.GetMasterNode()
1168 return env, [mn], [mn]
1170 def CheckPrereq(self):
1171 """Verify that the passed name is a valid one.
1174 hostname = utils.HostInfo(self.op.name)
1176 new_name = hostname.name
1177 self.ip = new_ip = hostname.ip
1178 old_name = self.sstore.GetClusterName()
1179 old_ip = self.sstore.GetMasterIP()
1180 if new_name == old_name and new_ip == old_ip:
1181 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1182 " cluster has changed")
1183 if new_ip != old_ip:
1184 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1185 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1186 " reachable on the network. Aborting." %
1189 self.op.name = new_name
1191 def Exec(self, feedback_fn):
1192 """Rename the cluster.
1195 clustername = self.op.name
1199 # shutdown the master IP
1200 master = ss.GetMasterNode()
1201 if not rpc.call_node_stop_master(master):
1202 raise errors.OpExecError("Could not disable the master role")
1206 ss.SetKey(ss.SS_MASTER_IP, ip)
1207 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1209 # Distribute updated ss config to all nodes
1210 myself = self.cfg.GetNodeInfo(master)
1211 dist_nodes = self.cfg.GetNodeList()
1212 if myself.name in dist_nodes:
1213 dist_nodes.remove(myself.name)
1215 logger.Debug("Copying updated ssconf data to all nodes")
1216 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1217 fname = ss.KeyToFilename(keyname)
1218 result = rpc.call_upload_file(dist_nodes, fname)
1219 for to_node in dist_nodes:
1220 if not result[to_node]:
1221 logger.Error("copy of file %s to node %s failed" %
1224 if not rpc.call_node_start_master(master):
1225 logger.Error("Could not re-enable the master role on the master,"
1226 " please restart manually.")
1229 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1230 """Sleep and poll for an instance's disk to sync.
1233 if not instance.disks:
1237 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1239 node = instance.primary_node
1241 for dev in instance.disks:
1242 cfgw.SetDiskID(dev, node)
1248 cumul_degraded = False
1249 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1251 proc.LogWarning("Can't get any data from node %s" % node)
1254 raise errors.RemoteError("Can't contact node %s for mirror data,"
1255 " aborting." % node)
1259 for i in range(len(rstats)):
1262 proc.LogWarning("Can't compute data for node %s/%s" %
1263 (node, instance.disks[i].iv_name))
1265 # we ignore the ldisk parameter
1266 perc_done, est_time, is_degraded, _ = mstat
1267 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268 if perc_done is not None:
1270 if est_time is not None:
1271 rem_time = "%d estimated seconds remaining" % est_time
1274 rem_time = "no time estimate"
1275 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276 (instance.disks[i].iv_name, perc_done, rem_time))
1283 time.sleep(min(60, max_time))
1289 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1290 return not cumul_degraded
1293 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1294 """Check that mirrors are not degraded.
1296 The ldisk parameter, if True, will change the test from the
1297 is_degraded attribute (which represents overall non-ok status for
1298 the device(s)) to the ldisk (representing the local storage status).
1301 cfgw.SetDiskID(dev, node)
1308 if on_primary or dev.AssembleOnSecondary():
1309 rstats = rpc.call_blockdev_find(node, dev)
1311 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1314 result = result and (not rstats[idx])
1316 for child in dev.children:
1317 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1322 class LUDiagnoseOS(NoHooksLU):
1323 """Logical unit for OS diagnose/query.
1326 _OP_REQP = ["output_fields", "names"]
1328 def CheckPrereq(self):
1329 """Check prerequisites.
1331 This always succeeds, since this is a pure query LU.
1335 raise errors.OpPrereqError("Selective OS query not supported")
1337 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1338 _CheckOutputFields(static=[],
1339 dynamic=self.dynamic_fields,
1340 selected=self.op.output_fields)
1343 def _DiagnoseByOS(node_list, rlist):
1344 """Remaps a per-node return list into an a per-os per-node dictionary
1347 node_list: a list with the names of all nodes
1348 rlist: a map with node names as keys and OS objects as values
1351 map: a map with osnames as keys and as value another map, with
1353 keys and list of OS objects as values
1354 e.g. {"debian-etch": {"node1": [<object>,...],
1355 "node2": [<object>,]}
1360 for node_name, nr in rlist.iteritems():
1364 if os.name not in all_os:
1365 # build a list of nodes for this os containing empty lists
1366 # for each node in node_list
1367 all_os[os.name] = {}
1368 for nname in node_list:
1369 all_os[os.name][nname] = []
1370 all_os[os.name][node_name].append(os)
1373 def Exec(self, feedback_fn):
1374 """Compute the list of OSes.
1377 node_list = self.cfg.GetNodeList()
1378 node_data = rpc.call_os_diagnose(node_list)
1379 if node_data == False:
1380 raise errors.OpExecError("Can't gather the list of OSes")
1381 pol = self._DiagnoseByOS(node_list, node_data)
1383 for os_name, os_data in pol.iteritems():
1385 for field in self.op.output_fields:
1388 elif field == "valid":
1389 val = utils.all([osl and osl[0] for osl in os_data.values()])
1390 elif field == "node_status":
1392 for node_name, nos_list in os_data.iteritems():
1393 val[node_name] = [(v.status, v.path) for v in nos_list]
1395 raise errors.ParameterError(field)
1402 class LURemoveNode(LogicalUnit):
1403 """Logical unit for removing a node.
1406 HPATH = "node-remove"
1407 HTYPE = constants.HTYPE_NODE
1408 _OP_REQP = ["node_name"]
1410 def BuildHooksEnv(self):
1413 This doesn't run on the target node in the pre phase as a failed
1414 node would not allows itself to run.
1418 "OP_TARGET": self.op.node_name,
1419 "NODE_NAME": self.op.node_name,
1421 all_nodes = self.cfg.GetNodeList()
1422 all_nodes.remove(self.op.node_name)
1423 return env, all_nodes, all_nodes
1425 def CheckPrereq(self):
1426 """Check prerequisites.
1429 - the node exists in the configuration
1430 - it does not have primary or secondary instances
1431 - it's not the master
1433 Any errors are signalled by raising errors.OpPrereqError.
1436 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1438 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1440 instance_list = self.cfg.GetInstanceList()
1442 masternode = self.sstore.GetMasterNode()
1443 if node.name == masternode:
1444 raise errors.OpPrereqError("Node is the master node,"
1445 " you need to failover first.")
1447 for instance_name in instance_list:
1448 instance = self.cfg.GetInstanceInfo(instance_name)
1449 if node.name == instance.primary_node:
1450 raise errors.OpPrereqError("Instance %s still running on the node,"
1451 " please remove first." % instance_name)
1452 if node.name in instance.secondary_nodes:
1453 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1454 " please remove first." % instance_name)
1455 self.op.node_name = node.name
1458 def Exec(self, feedback_fn):
1459 """Removes the node from the cluster.
1463 logger.Info("stopping the node daemon and removing configs from node %s" %
1466 rpc.call_node_leave_cluster(node.name)
1468 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1470 logger.Info("Removing node %s from config" % node.name)
1472 self.cfg.RemoveNode(node.name)
1474 _RemoveHostFromEtcHosts(node.name)
1477 class LUQueryNodes(NoHooksLU):
1478 """Logical unit for querying nodes.
1481 _OP_REQP = ["output_fields", "names"]
1483 def CheckPrereq(self):
1484 """Check prerequisites.
1486 This checks that the fields required are valid output fields.
1489 self.dynamic_fields = frozenset([
1491 "mtotal", "mnode", "mfree",
1496 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1497 "pinst_list", "sinst_list",
1498 "pip", "sip", "tags"],
1499 dynamic=self.dynamic_fields,
1500 selected=self.op.output_fields)
1502 self.wanted = _GetWantedNodes(self, self.op.names)
1504 def Exec(self, feedback_fn):
1505 """Computes the list of nodes and their attributes.
1508 nodenames = self.wanted
1509 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1511 # begin data gathering
1513 if self.dynamic_fields.intersection(self.op.output_fields):
1515 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1516 for name in nodenames:
1517 nodeinfo = node_data.get(name, None)
1520 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1521 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1522 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1523 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1524 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1525 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1526 "bootid": nodeinfo['bootid'],
1529 live_data[name] = {}
1531 live_data = dict.fromkeys(nodenames, {})
1533 node_to_primary = dict([(name, set()) for name in nodenames])
1534 node_to_secondary = dict([(name, set()) for name in nodenames])
1536 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1537 "sinst_cnt", "sinst_list"))
1538 if inst_fields & frozenset(self.op.output_fields):
1539 instancelist = self.cfg.GetInstanceList()
1541 for instance_name in instancelist:
1542 inst = self.cfg.GetInstanceInfo(instance_name)
1543 if inst.primary_node in node_to_primary:
1544 node_to_primary[inst.primary_node].add(inst.name)
1545 for secnode in inst.secondary_nodes:
1546 if secnode in node_to_secondary:
1547 node_to_secondary[secnode].add(inst.name)
1549 # end data gathering
1552 for node in nodelist:
1554 for field in self.op.output_fields:
1557 elif field == "pinst_list":
1558 val = list(node_to_primary[node.name])
1559 elif field == "sinst_list":
1560 val = list(node_to_secondary[node.name])
1561 elif field == "pinst_cnt":
1562 val = len(node_to_primary[node.name])
1563 elif field == "sinst_cnt":
1564 val = len(node_to_secondary[node.name])
1565 elif field == "pip":
1566 val = node.primary_ip
1567 elif field == "sip":
1568 val = node.secondary_ip
1569 elif field == "tags":
1570 val = list(node.GetTags())
1571 elif field in self.dynamic_fields:
1572 val = live_data[node.name].get(field, None)
1574 raise errors.ParameterError(field)
1575 node_output.append(val)
1576 output.append(node_output)
1581 class LUQueryNodeVolumes(NoHooksLU):
1582 """Logical unit for getting volumes on node(s).
1585 _OP_REQP = ["nodes", "output_fields"]
1587 def CheckPrereq(self):
1588 """Check prerequisites.
1590 This checks that the fields required are valid output fields.
1593 self.nodes = _GetWantedNodes(self, self.op.nodes)
1595 _CheckOutputFields(static=["node"],
1596 dynamic=["phys", "vg", "name", "size", "instance"],
1597 selected=self.op.output_fields)
1600 def Exec(self, feedback_fn):
1601 """Computes the list of nodes and their attributes.
1604 nodenames = self.nodes
1605 volumes = rpc.call_node_volumes(nodenames)
1607 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1608 in self.cfg.GetInstanceList()]
1610 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1613 for node in nodenames:
1614 if node not in volumes or not volumes[node]:
1617 node_vols = volumes[node][:]
1618 node_vols.sort(key=lambda vol: vol['dev'])
1620 for vol in node_vols:
1622 for field in self.op.output_fields:
1625 elif field == "phys":
1629 elif field == "name":
1631 elif field == "size":
1632 val = int(float(vol['size']))
1633 elif field == "instance":
1635 if node not in lv_by_node[inst]:
1637 if vol['name'] in lv_by_node[inst][node]:
1643 raise errors.ParameterError(field)
1644 node_output.append(str(val))
1646 output.append(node_output)
1651 class LUAddNode(LogicalUnit):
1652 """Logical unit for adding node to the cluster.
1656 HTYPE = constants.HTYPE_NODE
1657 _OP_REQP = ["node_name"]
1659 def BuildHooksEnv(self):
1662 This will run on all nodes before, and on all nodes + the new node after.
1666 "OP_TARGET": self.op.node_name,
1667 "NODE_NAME": self.op.node_name,
1668 "NODE_PIP": self.op.primary_ip,
1669 "NODE_SIP": self.op.secondary_ip,
1671 nodes_0 = self.cfg.GetNodeList()
1672 nodes_1 = nodes_0 + [self.op.node_name, ]
1673 return env, nodes_0, nodes_1
1675 def CheckPrereq(self):
1676 """Check prerequisites.
1679 - the new node is not already in the config
1681 - its parameters (single/dual homed) matches the cluster
1683 Any errors are signalled by raising errors.OpPrereqError.
1686 node_name = self.op.node_name
1689 dns_data = utils.HostInfo(node_name)
1691 node = dns_data.name
1692 primary_ip = self.op.primary_ip = dns_data.ip
1693 secondary_ip = getattr(self.op, "secondary_ip", None)
1694 if secondary_ip is None:
1695 secondary_ip = primary_ip
1696 if not utils.IsValidIP(secondary_ip):
1697 raise errors.OpPrereqError("Invalid secondary IP given")
1698 self.op.secondary_ip = secondary_ip
1700 node_list = cfg.GetNodeList()
1701 if not self.op.readd and node in node_list:
1702 raise errors.OpPrereqError("Node %s is already in the configuration" %
1704 elif self.op.readd and node not in node_list:
1705 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1707 for existing_node_name in node_list:
1708 existing_node = cfg.GetNodeInfo(existing_node_name)
1710 if self.op.readd and node == existing_node_name:
1711 if (existing_node.primary_ip != primary_ip or
1712 existing_node.secondary_ip != secondary_ip):
1713 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1714 " address configuration as before")
1717 if (existing_node.primary_ip == primary_ip or
1718 existing_node.secondary_ip == primary_ip or
1719 existing_node.primary_ip == secondary_ip or
1720 existing_node.secondary_ip == secondary_ip):
1721 raise errors.OpPrereqError("New node ip address(es) conflict with"
1722 " existing node %s" % existing_node.name)
1724 # check that the type of the node (single versus dual homed) is the
1725 # same as for the master
1726 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1727 master_singlehomed = myself.secondary_ip == myself.primary_ip
1728 newbie_singlehomed = secondary_ip == primary_ip
1729 if master_singlehomed != newbie_singlehomed:
1730 if master_singlehomed:
1731 raise errors.OpPrereqError("The master has no private ip but the"
1732 " new node has one")
1734 raise errors.OpPrereqError("The master has a private ip but the"
1735 " new node doesn't have one")
1737 # checks reachablity
1738 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1739 raise errors.OpPrereqError("Node not reachable by ping")
1741 if not newbie_singlehomed:
1742 # check reachability from my secondary ip to newbie's secondary ip
1743 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1744 source=myself.secondary_ip):
1745 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1746 " based ping to noded port")
1748 self.new_node = objects.Node(name=node,
1749 primary_ip=primary_ip,
1750 secondary_ip=secondary_ip)
1752 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1753 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1754 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1755 constants.VNC_PASSWORD_FILE)
1757 def Exec(self, feedback_fn):
1758 """Adds the new node to the cluster.
1761 new_node = self.new_node
1762 node = new_node.name
1764 # set up inter-node password and certificate and restarts the node daemon
1765 gntpass = self.sstore.GetNodeDaemonPassword()
1766 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1767 raise errors.OpExecError("ganeti password corruption detected")
1768 f = open(constants.SSL_CERT_FILE)
1770 gntpem = f.read(8192)
1773 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1774 # so we use this to detect an invalid certificate; as long as the
1775 # cert doesn't contain this, the here-document will be correctly
1776 # parsed by the shell sequence below
1777 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1778 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1779 if not gntpem.endswith("\n"):
1780 raise errors.OpExecError("PEM must end with newline")
1781 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1783 # and then connect with ssh to set password and start ganeti-noded
1784 # note that all the below variables are sanitized at this point,
1785 # either by being constants or by the checks above
1787 mycommand = ("umask 077 && "
1788 "echo '%s' > '%s' && "
1789 "cat > '%s' << '!EOF.' && \n"
1790 "%s!EOF.\n%s restart" %
1791 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1792 constants.SSL_CERT_FILE, gntpem,
1793 constants.NODE_INITD_SCRIPT))
1795 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1797 raise errors.OpExecError("Remote command on node %s, error: %s,"
1799 (node, result.fail_reason, result.output))
1801 # check connectivity
1804 result = rpc.call_version([node])[node]
1806 if constants.PROTOCOL_VERSION == result:
1807 logger.Info("communication to node %s fine, sw version %s match" %
1810 raise errors.OpExecError("Version mismatch master version %s,"
1811 " node version %s" %
1812 (constants.PROTOCOL_VERSION, result))
1814 raise errors.OpExecError("Cannot get version from the new node")
1817 logger.Info("copy ssh key to node %s" % node)
1818 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1820 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1821 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1827 keyarray.append(f.read())
1831 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1832 keyarray[3], keyarray[4], keyarray[5])
1835 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1837 # Add node to our /etc/hosts, and add key to known_hosts
1838 _AddHostToEtcHosts(new_node.name)
1840 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1841 self.cfg.GetHostKey())
1843 if new_node.secondary_ip != new_node.primary_ip:
1844 if not rpc.call_node_tcp_ping(new_node.name,
1845 constants.LOCALHOST_IP_ADDRESS,
1846 new_node.secondary_ip,
1847 constants.DEFAULT_NODED_PORT,
1849 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1850 " you gave (%s). Please fix and re-run this"
1851 " command." % new_node.secondary_ip)
1853 success, msg = ssh.VerifyNodeHostname(node)
1855 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1856 " than the one the resolver gives: %s."
1857 " Please fix and re-run this command." %
1860 # Distribute updated /etc/hosts and known_hosts to all nodes,
1861 # including the node just added
1862 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1863 dist_nodes = self.cfg.GetNodeList()
1864 if not self.op.readd:
1865 dist_nodes.append(node)
1866 if myself.name in dist_nodes:
1867 dist_nodes.remove(myself.name)
1869 logger.Debug("Copying hosts and known_hosts to all nodes")
1870 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1871 result = rpc.call_upload_file(dist_nodes, fname)
1872 for to_node in dist_nodes:
1873 if not result[to_node]:
1874 logger.Error("copy of file %s to node %s failed" %
1877 to_copy = ss.GetFileList()
1878 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1879 to_copy.append(constants.VNC_PASSWORD_FILE)
1880 for fname in to_copy:
1881 if not ssh.CopyFileToNode(node, fname):
1882 logger.Error("could not copy file %s to node %s" % (fname, node))
1884 if not self.op.readd:
1885 logger.Info("adding node %s to cluster.conf" % node)
1886 self.cfg.AddNode(new_node)
1889 class LUMasterFailover(LogicalUnit):
1890 """Failover the master node to the current node.
1892 This is a special LU in that it must run on a non-master node.
1895 HPATH = "master-failover"
1896 HTYPE = constants.HTYPE_CLUSTER
1900 def BuildHooksEnv(self):
1903 This will run on the new master only in the pre phase, and on all
1904 the nodes in the post phase.
1908 "OP_TARGET": self.new_master,
1909 "NEW_MASTER": self.new_master,
1910 "OLD_MASTER": self.old_master,
1912 return env, [self.new_master], self.cfg.GetNodeList()
1914 def CheckPrereq(self):
1915 """Check prerequisites.
1917 This checks that we are not already the master.
1920 self.new_master = utils.HostInfo().name
1921 self.old_master = self.sstore.GetMasterNode()
1923 if self.old_master == self.new_master:
1924 raise errors.OpPrereqError("This commands must be run on the node"
1925 " where you want the new master to be."
1926 " %s is already the master" %
1929 def Exec(self, feedback_fn):
1930 """Failover the master node.
1932 This command, when run on a non-master node, will cause the current
1933 master to cease being master, and the non-master to become new
1937 #TODO: do not rely on gethostname returning the FQDN
1938 logger.Info("setting master to %s, old master: %s" %
1939 (self.new_master, self.old_master))
1941 if not rpc.call_node_stop_master(self.old_master):
1942 logger.Error("could disable the master role on the old master"
1943 " %s, please disable manually" % self.old_master)
1946 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1947 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1948 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1949 logger.Error("could not distribute the new simple store master file"
1950 " to the other nodes, please check.")
1952 if not rpc.call_node_start_master(self.new_master):
1953 logger.Error("could not start the master role on the new master"
1954 " %s, please check" % self.new_master)
1955 feedback_fn("Error in activating the master IP on the new master,"
1956 " please fix manually.")
1960 class LUQueryClusterInfo(NoHooksLU):
1961 """Query cluster configuration.
1967 def CheckPrereq(self):
1968 """No prerequsites needed for this LU.
1973 def Exec(self, feedback_fn):
1974 """Return cluster config.
1978 "name": self.sstore.GetClusterName(),
1979 "software_version": constants.RELEASE_VERSION,
1980 "protocol_version": constants.PROTOCOL_VERSION,
1981 "config_version": constants.CONFIG_VERSION,
1982 "os_api_version": constants.OS_API_VERSION,
1983 "export_version": constants.EXPORT_VERSION,
1984 "master": self.sstore.GetMasterNode(),
1985 "architecture": (platform.architecture()[0], platform.machine()),
1986 "hypervisor_type": self.sstore.GetHypervisorType(),
1992 class LUClusterCopyFile(NoHooksLU):
1993 """Copy file to cluster.
1996 _OP_REQP = ["nodes", "filename"]
1998 def CheckPrereq(self):
1999 """Check prerequisites.
2001 It should check that the named file exists and that the given list
2005 if not os.path.exists(self.op.filename):
2006 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2008 self.nodes = _GetWantedNodes(self, self.op.nodes)
2010 def Exec(self, feedback_fn):
2011 """Copy a file from master to some nodes.
2014 opts - class with options as members
2015 args - list containing a single element, the file name
2017 nodes - list containing the name of target nodes; if empty, all nodes
2020 filename = self.op.filename
2022 myname = utils.HostInfo().name
2024 for node in self.nodes:
2027 if not ssh.CopyFileToNode(node, filename):
2028 logger.Error("Copy of file %s to node %s failed" % (filename, node))
2031 class LUDumpClusterConfig(NoHooksLU):
2032 """Return a text-representation of the cluster-config.
2037 def CheckPrereq(self):
2038 """No prerequisites.
2043 def Exec(self, feedback_fn):
2044 """Dump a representation of the cluster config to the standard output.
2047 return self.cfg.DumpConfig()
2050 class LURunClusterCommand(NoHooksLU):
2051 """Run a command on some nodes.
2054 _OP_REQP = ["command", "nodes"]
2056 def CheckPrereq(self):
2057 """Check prerequisites.
2059 It checks that the given list of nodes is valid.
2062 self.nodes = _GetWantedNodes(self, self.op.nodes)
2064 def Exec(self, feedback_fn):
2065 """Run a command on some nodes.
2068 # put the master at the end of the nodes list
2069 master_node = self.sstore.GetMasterNode()
2070 if master_node in self.nodes:
2071 self.nodes.remove(master_node)
2072 self.nodes.append(master_node)
2075 for node in self.nodes:
2076 result = ssh.SSHCall(node, "root", self.op.command)
2077 data.append((node, result.output, result.exit_code))
2082 class LUActivateInstanceDisks(NoHooksLU):
2083 """Bring up an instance's disks.
2086 _OP_REQP = ["instance_name"]
2088 def CheckPrereq(self):
2089 """Check prerequisites.
2091 This checks that the instance is in the cluster.
2094 instance = self.cfg.GetInstanceInfo(
2095 self.cfg.ExpandInstanceName(self.op.instance_name))
2096 if instance is None:
2097 raise errors.OpPrereqError("Instance '%s' not known" %
2098 self.op.instance_name)
2099 self.instance = instance
2102 def Exec(self, feedback_fn):
2103 """Activate the disks.
2106 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2108 raise errors.OpExecError("Cannot activate block devices")
2113 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2114 """Prepare the block devices for an instance.
2116 This sets up the block devices on all nodes.
2119 instance: a ganeti.objects.Instance object
2120 ignore_secondaries: if true, errors on secondary nodes won't result
2121 in an error return from the function
2124 false if the operation failed
2125 list of (host, instance_visible_name, node_visible_name) if the operation
2126 suceeded with the mapping from node devices to instance devices
2130 iname = instance.name
2131 # With the two passes mechanism we try to reduce the window of
2132 # opportunity for the race condition of switching DRBD to primary
2133 # before handshaking occured, but we do not eliminate it
2135 # The proper fix would be to wait (with some limits) until the
2136 # connection has been made and drbd transitions from WFConnection
2137 # into any other network-connected state (Connected, SyncTarget,
2140 # 1st pass, assemble on all nodes in secondary mode
2141 for inst_disk in instance.disks:
2142 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2143 cfg.SetDiskID(node_disk, node)
2144 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2146 logger.Error("could not prepare block device %s on node %s"
2147 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2148 if not ignore_secondaries:
2151 # FIXME: race condition on drbd migration to primary
2153 # 2nd pass, do only the primary node
2154 for inst_disk in instance.disks:
2155 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2156 if node != instance.primary_node:
2158 cfg.SetDiskID(node_disk, node)
2159 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2161 logger.Error("could not prepare block device %s on node %s"
2162 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2164 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2166 # leave the disks configured for the primary node
2167 # this is a workaround that would be fixed better by
2168 # improving the logical/physical id handling
2169 for disk in instance.disks:
2170 cfg.SetDiskID(disk, instance.primary_node)
2172 return disks_ok, device_info
2175 def _StartInstanceDisks(cfg, instance, force):
2176 """Start the disks of an instance.
2179 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2180 ignore_secondaries=force)
2182 _ShutdownInstanceDisks(instance, cfg)
2183 if force is not None and not force:
2184 logger.Error("If the message above refers to a secondary node,"
2185 " you can retry the operation using '--force'.")
2186 raise errors.OpExecError("Disk consistency error")
2189 class LUDeactivateInstanceDisks(NoHooksLU):
2190 """Shutdown an instance's disks.
2193 _OP_REQP = ["instance_name"]
2195 def CheckPrereq(self):
2196 """Check prerequisites.
2198 This checks that the instance is in the cluster.
2201 instance = self.cfg.GetInstanceInfo(
2202 self.cfg.ExpandInstanceName(self.op.instance_name))
2203 if instance is None:
2204 raise errors.OpPrereqError("Instance '%s' not known" %
2205 self.op.instance_name)
2206 self.instance = instance
2208 def Exec(self, feedback_fn):
2209 """Deactivate the disks
2212 instance = self.instance
2213 ins_l = rpc.call_instance_list([instance.primary_node])
2214 ins_l = ins_l[instance.primary_node]
2215 if not type(ins_l) is list:
2216 raise errors.OpExecError("Can't contact node '%s'" %
2217 instance.primary_node)
2219 if self.instance.name in ins_l:
2220 raise errors.OpExecError("Instance is running, can't shutdown"
2223 _ShutdownInstanceDisks(instance, self.cfg)
2226 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2227 """Shutdown block devices of an instance.
2229 This does the shutdown on all nodes of the instance.
2231 If the ignore_primary is false, errors on the primary node are
2236 for disk in instance.disks:
2237 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2238 cfg.SetDiskID(top_disk, node)
2239 if not rpc.call_blockdev_shutdown(node, top_disk):
2240 logger.Error("could not shutdown block device %s on node %s" %
2241 (disk.iv_name, node))
2242 if not ignore_primary or node != instance.primary_node:
2247 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2248 """Checks if a node has enough free memory.
2250 This function check if a given node has the needed amount of free
2251 memory. In case the node has less memory or we cannot get the
2252 information from the node, this function raise an OpPrereqError
2256 - cfg: a ConfigWriter instance
2257 - node: the node name
2258 - reason: string to use in the error message
2259 - requested: the amount of memory in MiB
2262 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2263 if not (nodeinfo and isinstance(nodeinfo, dict) and
2264 node in nodeinfo and isinstance(nodeinfo[node], dict)):
2265 raise errors.OpPrereqError("Could not contact node %s for resource"
2266 " information" % (node,))
2268 free_mem = nodeinfo[node].get('memory_free')
2269 if not isinstance(free_mem, int):
2270 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2271 " was '%s'" % (node, free_mem))
2272 if requested > free_mem:
2273 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2274 " needed %s MiB, available %s MiB" %
2275 (node, reason, requested, free_mem))
2278 class LUStartupInstance(LogicalUnit):
2279 """Starts an instance.
2282 HPATH = "instance-start"
2283 HTYPE = constants.HTYPE_INSTANCE
2284 _OP_REQP = ["instance_name", "force"]
2286 def BuildHooksEnv(self):
2289 This runs on master, primary and secondary nodes of the instance.
2293 "FORCE": self.op.force,
2295 env.update(_BuildInstanceHookEnvByObject(self.instance))
2296 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2297 list(self.instance.secondary_nodes))
2300 def CheckPrereq(self):
2301 """Check prerequisites.
2303 This checks that the instance is in the cluster.
2306 instance = self.cfg.GetInstanceInfo(
2307 self.cfg.ExpandInstanceName(self.op.instance_name))
2308 if instance is None:
2309 raise errors.OpPrereqError("Instance '%s' not known" %
2310 self.op.instance_name)
2312 # check bridges existance
2313 _CheckInstanceBridgesExist(instance)
2315 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2316 "starting instance %s" % instance.name,
2319 self.instance = instance
2320 self.op.instance_name = instance.name
2322 def Exec(self, feedback_fn):
2323 """Start the instance.
2326 instance = self.instance
2327 force = self.op.force
2328 extra_args = getattr(self.op, "extra_args", "")
2330 self.cfg.MarkInstanceUp(instance.name)
2332 node_current = instance.primary_node
2334 _StartInstanceDisks(self.cfg, instance, force)
2336 if not rpc.call_instance_start(node_current, instance, extra_args):
2337 _ShutdownInstanceDisks(instance, self.cfg)
2338 raise errors.OpExecError("Could not start instance")
2341 class LURebootInstance(LogicalUnit):
2342 """Reboot an instance.
2345 HPATH = "instance-reboot"
2346 HTYPE = constants.HTYPE_INSTANCE
2347 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2349 def BuildHooksEnv(self):
2352 This runs on master, primary and secondary nodes of the instance.
2356 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2358 env.update(_BuildInstanceHookEnvByObject(self.instance))
2359 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2360 list(self.instance.secondary_nodes))
2363 def CheckPrereq(self):
2364 """Check prerequisites.
2366 This checks that the instance is in the cluster.
2369 instance = self.cfg.GetInstanceInfo(
2370 self.cfg.ExpandInstanceName(self.op.instance_name))
2371 if instance is None:
2372 raise errors.OpPrereqError("Instance '%s' not known" %
2373 self.op.instance_name)
2375 # check bridges existance
2376 _CheckInstanceBridgesExist(instance)
2378 self.instance = instance
2379 self.op.instance_name = instance.name
2381 def Exec(self, feedback_fn):
2382 """Reboot the instance.
2385 instance = self.instance
2386 ignore_secondaries = self.op.ignore_secondaries
2387 reboot_type = self.op.reboot_type
2388 extra_args = getattr(self.op, "extra_args", "")
2390 node_current = instance.primary_node
2392 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2393 constants.INSTANCE_REBOOT_HARD,
2394 constants.INSTANCE_REBOOT_FULL]:
2395 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2396 (constants.INSTANCE_REBOOT_SOFT,
2397 constants.INSTANCE_REBOOT_HARD,
2398 constants.INSTANCE_REBOOT_FULL))
2400 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2401 constants.INSTANCE_REBOOT_HARD]:
2402 if not rpc.call_instance_reboot(node_current, instance,
2403 reboot_type, extra_args):
2404 raise errors.OpExecError("Could not reboot instance")
2406 if not rpc.call_instance_shutdown(node_current, instance):
2407 raise errors.OpExecError("could not shutdown instance for full reboot")
2408 _ShutdownInstanceDisks(instance, self.cfg)
2409 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2410 if not rpc.call_instance_start(node_current, instance, extra_args):
2411 _ShutdownInstanceDisks(instance, self.cfg)
2412 raise errors.OpExecError("Could not start instance for full reboot")
2414 self.cfg.MarkInstanceUp(instance.name)
2417 class LUShutdownInstance(LogicalUnit):
2418 """Shutdown an instance.
2421 HPATH = "instance-stop"
2422 HTYPE = constants.HTYPE_INSTANCE
2423 _OP_REQP = ["instance_name"]
2425 def BuildHooksEnv(self):
2428 This runs on master, primary and secondary nodes of the instance.
2431 env = _BuildInstanceHookEnvByObject(self.instance)
2432 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2433 list(self.instance.secondary_nodes))
2436 def CheckPrereq(self):
2437 """Check prerequisites.
2439 This checks that the instance is in the cluster.
2442 instance = self.cfg.GetInstanceInfo(
2443 self.cfg.ExpandInstanceName(self.op.instance_name))
2444 if instance is None:
2445 raise errors.OpPrereqError("Instance '%s' not known" %
2446 self.op.instance_name)
2447 self.instance = instance
2449 def Exec(self, feedback_fn):
2450 """Shutdown the instance.
2453 instance = self.instance
2454 node_current = instance.primary_node
2455 self.cfg.MarkInstanceDown(instance.name)
2456 if not rpc.call_instance_shutdown(node_current, instance):
2457 logger.Error("could not shutdown instance")
2459 _ShutdownInstanceDisks(instance, self.cfg)
2462 class LUReinstallInstance(LogicalUnit):
2463 """Reinstall an instance.
2466 HPATH = "instance-reinstall"
2467 HTYPE = constants.HTYPE_INSTANCE
2468 _OP_REQP = ["instance_name"]
2470 def BuildHooksEnv(self):
2473 This runs on master, primary and secondary nodes of the instance.
2476 env = _BuildInstanceHookEnvByObject(self.instance)
2477 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2478 list(self.instance.secondary_nodes))
2481 def CheckPrereq(self):
2482 """Check prerequisites.
2484 This checks that the instance is in the cluster and is not running.
2487 instance = self.cfg.GetInstanceInfo(
2488 self.cfg.ExpandInstanceName(self.op.instance_name))
2489 if instance is None:
2490 raise errors.OpPrereqError("Instance '%s' not known" %
2491 self.op.instance_name)
2492 if instance.disk_template == constants.DT_DISKLESS:
2493 raise errors.OpPrereqError("Instance '%s' has no disks" %
2494 self.op.instance_name)
2495 if instance.status != "down":
2496 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2497 self.op.instance_name)
2498 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2500 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2501 (self.op.instance_name,
2502 instance.primary_node))
2504 self.op.os_type = getattr(self.op, "os_type", None)
2505 if self.op.os_type is not None:
2507 pnode = self.cfg.GetNodeInfo(
2508 self.cfg.ExpandNodeName(instance.primary_node))
2510 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2512 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2514 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2515 " primary node" % self.op.os_type)
2517 self.instance = instance
2519 def Exec(self, feedback_fn):
2520 """Reinstall the instance.
2523 inst = self.instance
2525 if self.op.os_type is not None:
2526 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2527 inst.os = self.op.os_type
2528 self.cfg.AddInstance(inst)
2530 _StartInstanceDisks(self.cfg, inst, None)
2532 feedback_fn("Running the instance OS create scripts...")
2533 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2534 raise errors.OpExecError("Could not install OS for instance %s"
2536 (inst.name, inst.primary_node))
2538 _ShutdownInstanceDisks(inst, self.cfg)
2541 class LURenameInstance(LogicalUnit):
2542 """Rename an instance.
2545 HPATH = "instance-rename"
2546 HTYPE = constants.HTYPE_INSTANCE
2547 _OP_REQP = ["instance_name", "new_name"]
2549 def BuildHooksEnv(self):
2552 This runs on master, primary and secondary nodes of the instance.
2555 env = _BuildInstanceHookEnvByObject(self.instance)
2556 env["INSTANCE_NEW_NAME"] = self.op.new_name
2557 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2558 list(self.instance.secondary_nodes))
2561 def CheckPrereq(self):
2562 """Check prerequisites.
2564 This checks that the instance is in the cluster and is not running.
2567 instance = self.cfg.GetInstanceInfo(
2568 self.cfg.ExpandInstanceName(self.op.instance_name))
2569 if instance is None:
2570 raise errors.OpPrereqError("Instance '%s' not known" %
2571 self.op.instance_name)
2572 if instance.status != "down":
2573 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2574 self.op.instance_name)
2575 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2577 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2578 (self.op.instance_name,
2579 instance.primary_node))
2580 self.instance = instance
2582 # new name verification
2583 name_info = utils.HostInfo(self.op.new_name)
2585 self.op.new_name = new_name = name_info.name
2586 instance_list = self.cfg.GetInstanceList()
2587 if new_name in instance_list:
2588 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2591 if not getattr(self.op, "ignore_ip", False):
2592 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2593 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2594 (name_info.ip, new_name))
2597 def Exec(self, feedback_fn):
2598 """Reinstall the instance.
2601 inst = self.instance
2602 old_name = inst.name
2604 self.cfg.RenameInstance(inst.name, self.op.new_name)
2606 # re-read the instance from the configuration after rename
2607 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2609 _StartInstanceDisks(self.cfg, inst, None)
2611 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2613 msg = ("Could not run OS rename script for instance %s on node %s"
2614 " (but the instance has been renamed in Ganeti)" %
2615 (inst.name, inst.primary_node))
2618 _ShutdownInstanceDisks(inst, self.cfg)
2621 class LURemoveInstance(LogicalUnit):
2622 """Remove an instance.
2625 HPATH = "instance-remove"
2626 HTYPE = constants.HTYPE_INSTANCE
2627 _OP_REQP = ["instance_name", "ignore_failures"]
2629 def BuildHooksEnv(self):
2632 This runs on master, primary and secondary nodes of the instance.
2635 env = _BuildInstanceHookEnvByObject(self.instance)
2636 nl = [self.sstore.GetMasterNode()]
2639 def CheckPrereq(self):
2640 """Check prerequisites.
2642 This checks that the instance is in the cluster.
2645 instance = self.cfg.GetInstanceInfo(
2646 self.cfg.ExpandInstanceName(self.op.instance_name))
2647 if instance is None:
2648 raise errors.OpPrereqError("Instance '%s' not known" %
2649 self.op.instance_name)
2650 self.instance = instance
2652 def Exec(self, feedback_fn):
2653 """Remove the instance.
2656 instance = self.instance
2657 logger.Info("shutting down instance %s on node %s" %
2658 (instance.name, instance.primary_node))
2660 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2661 if self.op.ignore_failures:
2662 feedback_fn("Warning: can't shutdown instance")
2664 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2665 (instance.name, instance.primary_node))
2667 logger.Info("removing block devices for instance %s" % instance.name)
2669 if not _RemoveDisks(instance, self.cfg):
2670 if self.op.ignore_failures:
2671 feedback_fn("Warning: can't remove instance's disks")
2673 raise errors.OpExecError("Can't remove instance's disks")
2675 logger.Info("removing instance %s out of cluster config" % instance.name)
2677 self.cfg.RemoveInstance(instance.name)
2680 class LUQueryInstances(NoHooksLU):
2681 """Logical unit for querying instances.
2684 _OP_REQP = ["output_fields", "names"]
2686 def CheckPrereq(self):
2687 """Check prerequisites.
2689 This checks that the fields required are valid output fields.
2692 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2693 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2694 "admin_state", "admin_ram",
2695 "disk_template", "ip", "mac", "bridge",
2696 "sda_size", "sdb_size", "vcpus", "tags"],
2697 dynamic=self.dynamic_fields,
2698 selected=self.op.output_fields)
2700 self.wanted = _GetWantedInstances(self, self.op.names)
2702 def Exec(self, feedback_fn):
2703 """Computes the list of nodes and their attributes.
2706 instance_names = self.wanted
2707 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2710 # begin data gathering
2712 nodes = frozenset([inst.primary_node for inst in instance_list])
2715 if self.dynamic_fields.intersection(self.op.output_fields):
2717 node_data = rpc.call_all_instances_info(nodes)
2719 result = node_data[name]
2721 live_data.update(result)
2722 elif result == False:
2723 bad_nodes.append(name)
2724 # else no instance is alive
2726 live_data = dict([(name, {}) for name in instance_names])
2728 # end data gathering
2731 for instance in instance_list:
2733 for field in self.op.output_fields:
2738 elif field == "pnode":
2739 val = instance.primary_node
2740 elif field == "snodes":
2741 val = list(instance.secondary_nodes)
2742 elif field == "admin_state":
2743 val = (instance.status != "down")
2744 elif field == "oper_state":
2745 if instance.primary_node in bad_nodes:
2748 val = bool(live_data.get(instance.name))
2749 elif field == "status":
2750 if instance.primary_node in bad_nodes:
2751 val = "ERROR_nodedown"
2753 running = bool(live_data.get(instance.name))
2755 if instance.status != "down":
2760 if instance.status != "down":
2764 elif field == "admin_ram":
2765 val = instance.memory
2766 elif field == "oper_ram":
2767 if instance.primary_node in bad_nodes:
2769 elif instance.name in live_data:
2770 val = live_data[instance.name].get("memory", "?")
2773 elif field == "disk_template":
2774 val = instance.disk_template
2776 val = instance.nics[0].ip
2777 elif field == "bridge":
2778 val = instance.nics[0].bridge
2779 elif field == "mac":
2780 val = instance.nics[0].mac
2781 elif field == "sda_size" or field == "sdb_size":
2782 disk = instance.FindDisk(field[:3])
2787 elif field == "vcpus":
2788 val = instance.vcpus
2789 elif field == "tags":
2790 val = list(instance.GetTags())
2792 raise errors.ParameterError(field)
2799 class LUFailoverInstance(LogicalUnit):
2800 """Failover an instance.
2803 HPATH = "instance-failover"
2804 HTYPE = constants.HTYPE_INSTANCE
2805 _OP_REQP = ["instance_name", "ignore_consistency"]
2807 def BuildHooksEnv(self):
2810 This runs on master, primary and secondary nodes of the instance.
2814 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2816 env.update(_BuildInstanceHookEnvByObject(self.instance))
2817 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2820 def CheckPrereq(self):
2821 """Check prerequisites.
2823 This checks that the instance is in the cluster.
2826 instance = self.cfg.GetInstanceInfo(
2827 self.cfg.ExpandInstanceName(self.op.instance_name))
2828 if instance is None:
2829 raise errors.OpPrereqError("Instance '%s' not known" %
2830 self.op.instance_name)
2832 if instance.disk_template not in constants.DTS_NET_MIRROR:
2833 raise errors.OpPrereqError("Instance's disk layout is not"
2834 " network mirrored, cannot failover.")
2836 secondary_nodes = instance.secondary_nodes
2837 if not secondary_nodes:
2838 raise errors.ProgrammerError("no secondary node but using "
2839 "DT_REMOTE_RAID1 template")
2841 target_node = secondary_nodes[0]
2842 # check memory requirements on the secondary node
2843 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2844 instance.name, instance.memory)
2846 # check bridge existance
2847 brlist = [nic.bridge for nic in instance.nics]
2848 if not rpc.call_bridges_exist(target_node, brlist):
2849 raise errors.OpPrereqError("One or more target bridges %s does not"
2850 " exist on destination node '%s'" %
2851 (brlist, target_node))
2853 self.instance = instance
2855 def Exec(self, feedback_fn):
2856 """Failover an instance.
2858 The failover is done by shutting it down on its present node and
2859 starting it on the secondary.
2862 instance = self.instance
2864 source_node = instance.primary_node
2865 target_node = instance.secondary_nodes[0]
2867 feedback_fn("* checking disk consistency between source and target")
2868 for dev in instance.disks:
2869 # for remote_raid1, these are md over drbd
2870 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2871 if instance.status == "up" and not self.op.ignore_consistency:
2872 raise errors.OpExecError("Disk %s is degraded on target node,"
2873 " aborting failover." % dev.iv_name)
2875 feedback_fn("* shutting down instance on source node")
2876 logger.Info("Shutting down instance %s on node %s" %
2877 (instance.name, source_node))
2879 if not rpc.call_instance_shutdown(source_node, instance):
2880 if self.op.ignore_consistency:
2881 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2882 " anyway. Please make sure node %s is down" %
2883 (instance.name, source_node, source_node))
2885 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2886 (instance.name, source_node))
2888 feedback_fn("* deactivating the instance's disks on source node")
2889 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2890 raise errors.OpExecError("Can't shut down the instance's disks.")
2892 instance.primary_node = target_node
2893 # distribute new instance config to the other nodes
2894 self.cfg.Update(instance)
2896 # Only start the instance if it's marked as up
2897 if instance.status == "up":
2898 feedback_fn("* activating the instance's disks on target node")
2899 logger.Info("Starting instance %s on node %s" %
2900 (instance.name, target_node))
2902 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2903 ignore_secondaries=True)
2905 _ShutdownInstanceDisks(instance, self.cfg)
2906 raise errors.OpExecError("Can't activate the instance's disks")
2908 feedback_fn("* starting the instance on the target node")
2909 if not rpc.call_instance_start(target_node, instance, None):
2910 _ShutdownInstanceDisks(instance, self.cfg)
2911 raise errors.OpExecError("Could not start instance %s on node %s." %
2912 (instance.name, target_node))
2915 class LUMigrateInstance(LogicalUnit):
2916 """Migrate an instance.
2918 This is migration without shutting down, compared to the failover,
2919 which is done with shutdown.
2922 HPATH = "instance-migrate"
2923 HTYPE = constants.HTYPE_INSTANCE
2924 _OP_REQP = ["instance_name", "live", "cleanup"]
2926 def BuildHooksEnv(self):
2929 This runs on master, primary and secondary nodes of the instance.
2932 env = _BuildInstanceHookEnvByObject(self.instance)
2933 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2936 def CheckPrereq(self):
2937 """Check prerequisites.
2939 This checks that the instance is in the cluster.
2942 instance = self.cfg.GetInstanceInfo(
2943 self.cfg.ExpandInstanceName(self.op.instance_name))
2944 if instance is None:
2945 raise errors.OpPrereqError("Instance '%s' not known" %
2946 self.op.instance_name)
2948 if instance.disk_template != constants.DT_DRBD8:
2949 raise errors.OpPrereqError("Instance's disk layout is not"
2950 " drbd8, cannot migrate.")
2952 secondary_nodes = instance.secondary_nodes
2953 if not secondary_nodes:
2954 raise errors.ProgrammerError("no secondary node but using "
2955 "drbd8 disk template")
2957 target_node = secondary_nodes[0]
2958 # check memory requirements on the secondary node
2959 _CheckNodeFreeMemory(self.cfg, target_node, "migrating instance %s" %
2960 instance.name, instance.memory)
2962 # check bridge existance
2963 brlist = [nic.bridge for nic in instance.nics]
2964 if not rpc.call_bridges_exist(target_node, brlist):
2965 raise errors.OpPrereqError("One or more target bridges %s does not"
2966 " exist on destination node '%s'" %
2967 (brlist, target_node))
2969 if not self.op.cleanup:
2970 migratable = rpc.call_instance_migratable(instance.primary_node,
2973 raise errors.OpPrereqError("Can't contact node '%s'" %
2974 instance.primary_node)
2975 if not migratable[0]:
2976 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
2979 self.instance = instance
2981 def _WaitUntilSync(self):
2982 """Poll with custom rpc for disk sync.
2984 This uses our own step-based rpc call.
2987 self.feedback_fn("* wait until resync is done")
2991 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
2992 self.instance.disks,
2993 self.nodes_ip, False,
2994 constants.DRBD_RECONF_RPC_WFSYNC)
2996 for node in self.all_nodes:
2997 if not result[node] or not result[node][0]:
2998 raise errors.OpExecError("Cannot resync disks on node %s" % (node,))
2999 node_done, node_percent = result[node][1]
3000 all_done = all_done and node_done
3001 if node_percent is not None:
3002 min_percent = min(min_percent, node_percent)
3004 if min_percent < 100:
3005 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3008 def _EnsureSecondary(self, node):
3009 """Demote a node to secondary.
3012 self.feedback_fn("* switching node %s to secondary mode" % node)
3013 result = rpc.call_drbd_reconfig_net([node], self.instance.name,
3014 self.instance.disks,
3015 self.nodes_ip, False,
3016 constants.DRBD_RECONF_RPC_SECONDARY)
3017 if not result[node] or not result[node][0]:
3018 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3020 (node, result[node][1]))
3022 def _GoStandalone(self):
3023 """Disconnect from the network.
3026 self.feedback_fn("* changing into standalone mode")
3027 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3028 self.instance.disks,
3029 self.nodes_ip, True,
3030 constants.DRBD_RECONF_RPC_DISCONNECT)
3031 for node in self.all_nodes:
3032 if not result[node] or not result[node][0]:
3033 raise errors.OpExecError("Cannot disconnect disks node %s,"
3034 " error %s" % (node, result[node][1]))
3036 def _GoReconnect(self, multimaster):
3037 """Reconnect to the network.
3043 msg = "single-master"
3044 self.feedback_fn("* changing disks into %s mode" % msg)
3045 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3046 self.instance.disks,
3049 constants.DRBD_RECONF_RPC_RECONNECT)
3050 for node in self.all_nodes:
3051 if not result[node] or not result[node][0]:
3052 raise errors.OpExecError("Cannot change disks config on node %s,"
3053 " error %s" % (node, result[node][1]))
3055 def _IdentifyDisks(self):
3056 """Start the migration RPC sequence.
3059 self.feedback_fn("* identifying disks")
3060 result = rpc.call_drbd_reconfig_net(self.all_nodes,
3062 self.instance.disks,
3063 self.nodes_ip, True,
3064 constants.DRBD_RECONF_RPC_INIT)
3065 for node in self.all_nodes:
3066 if not result[node] or not result[node][0]:
3067 raise errors.OpExecError("Cannot identify disks node %s,"
3068 " error %s" % (node, result[node][1]))
3070 def _ExecCleanup(self):
3071 """Try to cleanup after a failed migration.
3073 The cleanup is done by:
3074 - check that the instance is running only on one node
3075 (and update the config if needed)
3076 - change disks on its secondary node to secondary
3077 - wait until disks are fully synchronized
3078 - disconnect from the network
3079 - change disks into single-master mode
3080 - wait again until disks are fully synchronized
3083 instance = self.instance
3084 target_node = self.target_node
3085 source_node = self.source_node
3087 # check running on only one node
3088 self.feedback_fn("* checking where the instance actually runs"
3089 " (if this hangs, the hypervisor might be in"
3091 ins_l = rpc.call_instance_list(self.all_nodes)
3092 for node in self.all_nodes:
3093 if not type(ins_l[node]) is list:
3094 raise errors.OpExecError("Can't contact node '%s'" % node)
3096 runningon_source = instance.name in ins_l[source_node]
3097 runningon_target = instance.name in ins_l[target_node]
3099 if runningon_source and runningon_target:
3100 raise errors.OpExecError("Instance seems to be running on two nodes,"
3101 " or the hypervisor is confused. You will have"
3102 " to ensure manually that it runs only on one"
3103 " and restart this operation.")
3105 if not (runningon_source or runningon_target):
3106 raise errors.OpExecError("Instance does not seem to be running at all."
3107 " In this case, it's safer to repair by"
3108 " running 'gnt-instance stop' to ensure disk"
3109 " shutdown, and then restarting it.")
3111 if runningon_target:
3112 # the migration has actually succeeded, we need to update the config
3113 self.feedback_fn("* instance running on secondary node (%s),"
3114 " updating config" % target_node)
3115 instance.primary_node = target_node
3116 self.cfg.Update(instance)
3117 demoted_node = source_node
3119 self.feedback_fn("* instance confirmed to be running on its"
3120 " primary node (%s)" % source_node)
3121 demoted_node = target_node
3123 self._IdentifyDisks()
3125 self._EnsureSecondary(demoted_node)
3126 self._WaitUntilSync()
3127 self._GoStandalone()
3128 self._GoReconnect(False)
3129 self._WaitUntilSync()
3131 self.feedback_fn("* done")
3133 def _ExecMigration(self):
3134 """Migrate an instance.
3136 The migrate is done by:
3137 - change the disks into dual-master mode
3138 - wait until disks are fully synchronized again
3139 - migrate the instance
3140 - change disks on the new secondary node (the old primary) to secondary
3141 - wait until disks are fully synchronized
3142 - change disks into single-master mode
3145 instance = self.instance
3146 target_node = self.target_node
3147 source_node = self.source_node
3149 self.feedback_fn("* checking disk consistency between source and target")
3150 for dev in instance.disks:
3151 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3152 raise errors.OpExecError("Disk %s is degraded or not fully"
3153 " synchronized on target node,"
3154 " aborting migrate." % dev.iv_name)
3156 self._IdentifyDisks()
3158 self._EnsureSecondary(target_node)
3159 self._GoStandalone()
3160 self._GoReconnect(True)
3161 self._WaitUntilSync()
3163 self.feedback_fn("* migrating instance to %s" % target_node)
3165 result = rpc.call_instance_migrate(source_node, instance,
3166 self.nodes_ip[target_node],
3168 if not result or not result[0]:
3169 logger.Error("Instance migration failed, trying to revert disk status")
3171 self._EnsureSecondary(target_node)
3172 self._GoStandalone()
3173 self._GoReconnect(False)
3174 self._WaitUntilSync()
3175 except errors.OpExecError, err:
3176 logger.Error("Can't reconnect the drives: error '%s'\n"
3177 "Please look and recover the instance status" % str(err))
3179 raise errors.OpExecError("Could not migrate instance %s: %s" %
3180 (instance.name, result[1]))
3183 instance.primary_node = target_node
3184 # distribute new instance config to the other nodes
3185 self.cfg.Update(instance)
3187 self._EnsureSecondary(source_node)
3188 self._WaitUntilSync()
3189 self._GoStandalone()
3190 self._GoReconnect(False)
3191 self._WaitUntilSync()
3193 self.feedback_fn("* done")
3195 def Exec(self, feedback_fn):
3196 """Perform the migration.
3199 self.feedback_fn = feedback_fn
3201 self.source_node = self.instance.primary_node
3202 self.target_node = self.instance.secondary_nodes[0]
3203 self.all_nodes = [self.source_node, self.target_node]
3205 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3206 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3209 return self._ExecCleanup()
3211 return self._ExecMigration()
3214 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3215 """Create a tree of block devices on the primary node.
3217 This always creates all devices.
3221 for child in device.children:
3222 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3225 cfg.SetDiskID(device, node)
3226 new_id = rpc.call_blockdev_create(node, device, device.size,
3227 instance.name, True, info)
3230 if device.physical_id is None:
3231 device.physical_id = new_id
3235 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3236 """Create a tree of block devices on a secondary node.
3238 If this device type has to be created on secondaries, create it and
3241 If not, just recurse to children keeping the same 'force' value.
3244 if device.CreateOnSecondary():
3247 for child in device.children:
3248 if not _CreateBlockDevOnSecondary(cfg, node, instance,
3249 child, force, info):
3254 cfg.SetDiskID(device, node)
3255 new_id = rpc.call_blockdev_create(node, device, device.size,
3256 instance.name, False, info)
3259 if device.physical_id is None:
3260 device.physical_id = new_id
3264 def _GenerateUniqueNames(cfg, exts):
3265 """Generate a suitable LV name.
3267 This will generate a logical volume name for the given instance.
3272 new_id = cfg.GenerateUniqueID()
3273 results.append("%s%s" % (new_id, val))
3277 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3278 """Generate a drbd device complete with its children.
3281 port = cfg.AllocatePort()
3282 vgname = cfg.GetVGName()
3283 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3284 logical_id=(vgname, names[0]))
3285 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3286 logical_id=(vgname, names[1]))
3287 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3288 logical_id = (primary, secondary, port),
3289 children = [dev_data, dev_meta])
3293 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3294 """Generate a drbd8 device complete with its children.
3297 port = cfg.AllocatePort()
3298 vgname = cfg.GetVGName()
3299 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3300 logical_id=(vgname, names[0]))
3301 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3302 logical_id=(vgname, names[1]))
3303 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3304 logical_id = (primary, secondary, port),
3305 children = [dev_data, dev_meta],
3309 def _GenerateDiskTemplate(cfg, template_name,
3310 instance_name, primary_node,
3311 secondary_nodes, disk_sz, swap_sz):
3312 """Generate the entire disk layout for a given template type.
3315 #TODO: compute space requirements
3317 vgname = cfg.GetVGName()
3318 if template_name == constants.DT_DISKLESS:
3320 elif template_name == constants.DT_PLAIN:
3321 if len(secondary_nodes) != 0:
3322 raise errors.ProgrammerError("Wrong template configuration")
3324 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3325 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3326 logical_id=(vgname, names[0]),
3328 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3329 logical_id=(vgname, names[1]),
3331 disks = [sda_dev, sdb_dev]
3332 elif template_name == constants.DT_LOCAL_RAID1:
3333 if len(secondary_nodes) != 0:
3334 raise errors.ProgrammerError("Wrong template configuration")
3337 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3338 ".sdb_m1", ".sdb_m2"])
3339 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3340 logical_id=(vgname, names[0]))
3341 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3342 logical_id=(vgname, names[1]))
3343 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3345 children = [sda_dev_m1, sda_dev_m2])
3346 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3347 logical_id=(vgname, names[2]))
3348 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3349 logical_id=(vgname, names[3]))
3350 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3352 children = [sdb_dev_m1, sdb_dev_m2])
3353 disks = [md_sda_dev, md_sdb_dev]
3354 elif template_name == constants.DT_REMOTE_RAID1:
3355 if len(secondary_nodes) != 1:
3356 raise errors.ProgrammerError("Wrong template configuration")
3357 remote_node = secondary_nodes[0]
3358 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3359 ".sdb_data", ".sdb_meta"])
3360 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3361 disk_sz, names[0:2])
3362 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3363 children = [drbd_sda_dev], size=disk_sz)
3364 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3365 swap_sz, names[2:4])
3366 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3367 children = [drbd_sdb_dev], size=swap_sz)
3368 disks = [md_sda_dev, md_sdb_dev]
3369 elif template_name == constants.DT_DRBD8:
3370 if len(secondary_nodes) != 1:
3371 raise errors.ProgrammerError("Wrong template configuration")
3372 remote_node = secondary_nodes[0]
3373 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3374 ".sdb_data", ".sdb_meta"])
3375 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3376 disk_sz, names[0:2], "sda")
3377 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3378 swap_sz, names[2:4], "sdb")
3379 disks = [drbd_sda_dev, drbd_sdb_dev]
3381 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3385 def _GetInstanceInfoText(instance):
3386 """Compute that text that should be added to the disk's metadata.
3389 return "originstname+%s" % instance.name
3392 def _CreateDisks(cfg, instance):
3393 """Create all disks for an instance.
3395 This abstracts away some work from AddInstance.
3398 instance: the instance object
3401 True or False showing the success of the creation process
3404 info = _GetInstanceInfoText(instance)
3406 for device in instance.disks:
3407 logger.Info("creating volume %s for instance %s" %
3408 (device.iv_name, instance.name))
3410 for secondary_node in instance.secondary_nodes:
3411 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3412 device, False, info):
3413 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3414 (device.iv_name, device, secondary_node))
3417 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3418 instance, device, info):
3419 logger.Error("failed to create volume %s on primary!" %
3425 def _RemoveDisks(instance, cfg):
3426 """Remove all disks for an instance.
3428 This abstracts away some work from `AddInstance()` and
3429 `RemoveInstance()`. Note that in case some of the devices couldn't
3430 be removed, the removal will continue with the other ones (compare
3431 with `_CreateDisks()`).
3434 instance: the instance object
3437 True or False showing the success of the removal proces
3440 logger.Info("removing block devices for instance %s" % instance.name)
3443 for device in instance.disks:
3444 for node, disk in device.ComputeNodeTree(instance.primary_node):
3445 cfg.SetDiskID(disk, node)
3446 if not rpc.call_blockdev_remove(node, disk):
3447 logger.Error("could not remove block device %s on node %s,"
3448 " continuing anyway" %
3449 (device.iv_name, node))
3454 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3455 """Compute disk size requirements in the volume group
3457 This is currently hard-coded for the two-drive layout.
3460 # Required free disk space as a function of disk and swap space
3462 constants.DT_DISKLESS: None,
3463 constants.DT_PLAIN: disk_size + swap_size,
3464 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3465 # 256 MB are added for drbd metadata, 128MB for each drbd device
3466 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3467 constants.DT_DRBD8: disk_size + swap_size + 256,
3470 if disk_template not in req_size_dict:
3471 raise errors.ProgrammerError("Disk template '%s' size requirement"
3472 " is unknown" % disk_template)
3474 return req_size_dict[disk_template]
3477 class LUCreateInstance(LogicalUnit):
3478 """Create an instance.
3481 HPATH = "instance-add"
3482 HTYPE = constants.HTYPE_INSTANCE
3483 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3484 "disk_template", "swap_size", "mode", "start", "vcpus",
3485 "wait_for_sync", "ip_check", "mac"]
3487 def _RunAllocator(self):
3488 """Run the allocator based on input opcode.
3491 disks = [{"size": self.op.disk_size, "mode": "w"},
3492 {"size": self.op.swap_size, "mode": "w"}]
3493 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3494 "bridge": self.op.bridge}]
3495 ial = IAllocator(self.cfg, self.sstore,
3496 mode=constants.IALLOCATOR_MODE_ALLOC,
3497 name=self.op.instance_name,
3498 disk_template=self.op.disk_template,
3501 vcpus=self.op.vcpus,
3502 mem_size=self.op.mem_size,
3507 ial.Run(self.op.iallocator)
3510 raise errors.OpPrereqError("Can't compute nodes using"
3511 " iallocator '%s': %s" % (self.op.iallocator,
3513 if len(ial.nodes) != ial.required_nodes:
3514 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3515 " of nodes (%s), required %s" %
3516 (len(ial.nodes), ial.required_nodes))
3517 self.op.pnode = ial.nodes[0]
3518 logger.ToStdout("Selected nodes for the instance: %s" %
3519 (", ".join(ial.nodes),))
3520 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3521 (self.op.instance_name, self.op.iallocator, ial.nodes))
3522 if ial.required_nodes == 2:
3523 self.op.snode = ial.nodes[1]
3525 def BuildHooksEnv(self):
3528 This runs on master, primary and secondary nodes of the instance.
3532 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3533 "INSTANCE_DISK_SIZE": self.op.disk_size,
3534 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3535 "INSTANCE_ADD_MODE": self.op.mode,
3537 if self.op.mode == constants.INSTANCE_IMPORT:
3538 env["INSTANCE_SRC_NODE"] = self.op.src_node
3539 env["INSTANCE_SRC_PATH"] = self.op.src_path
3540 env["INSTANCE_SRC_IMAGE"] = self.src_image
3542 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3543 primary_node=self.op.pnode,
3544 secondary_nodes=self.secondaries,
3545 status=self.instance_status,
3546 os_type=self.op.os_type,
3547 memory=self.op.mem_size,
3548 vcpus=self.op.vcpus,
3549 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3552 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3557 def CheckPrereq(self):
3558 """Check prerequisites.
3561 # set optional parameters to none if they don't exist
3562 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3563 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3564 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3565 if not hasattr(self.op, attr):
3566 setattr(self.op, attr, None)
3568 if self.op.mode not in (constants.INSTANCE_CREATE,
3569 constants.INSTANCE_IMPORT):
3570 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3573 if self.op.mode == constants.INSTANCE_IMPORT:
3574 src_node = getattr(self.op, "src_node", None)
3575 src_path = getattr(self.op, "src_path", None)
3576 if src_node is None or src_path is None:
3577 raise errors.OpPrereqError("Importing an instance requires source"
3578 " node and path options")
3579 src_node_full = self.cfg.ExpandNodeName(src_node)
3580 if src_node_full is None:
3581 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3582 self.op.src_node = src_node = src_node_full
3584 if not os.path.isabs(src_path):
3585 raise errors.OpPrereqError("The source path must be absolute")
3587 export_info = rpc.call_export_info(src_node, src_path)
3590 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3592 if not export_info.has_section(constants.INISECT_EXP):
3593 raise errors.ProgrammerError("Corrupted export config")
3595 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3596 if (int(ei_version) != constants.EXPORT_VERSION):
3597 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3598 (ei_version, constants.EXPORT_VERSION))
3600 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3601 raise errors.OpPrereqError("Can't import instance with more than"
3604 # FIXME: are the old os-es, disk sizes, etc. useful?
3605 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3606 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3608 self.src_image = diskimage
3609 else: # INSTANCE_CREATE
3610 if getattr(self.op, "os_type", None) is None:
3611 raise errors.OpPrereqError("No guest OS specified")
3613 #### instance parameters check
3615 # disk template and mirror node verification
3616 if self.op.disk_template not in constants.DISK_TEMPLATES:
3617 raise errors.OpPrereqError("Invalid disk template name")
3619 # instance name verification
3620 hostname1 = utils.HostInfo(self.op.instance_name)
3622 self.op.instance_name = instance_name = hostname1.name
3623 instance_list = self.cfg.GetInstanceList()
3624 if instance_name in instance_list:
3625 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3628 # ip validity checks
3629 ip = getattr(self.op, "ip", None)
3630 if ip is None or ip.lower() == "none":
3632 elif ip.lower() == "auto":
3633 inst_ip = hostname1.ip
3635 if not utils.IsValidIP(ip):
3636 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3637 " like a valid IP" % ip)
3639 self.inst_ip = self.op.ip = inst_ip
3641 if self.op.start and not self.op.ip_check:
3642 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3643 " adding an instance in start mode")
3645 if self.op.ip_check:
3646 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3647 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3648 (hostname1.ip, instance_name))
3650 # MAC address verification
3651 if self.op.mac != "auto":
3652 if not utils.IsValidMac(self.op.mac.lower()):
3653 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3656 # bridge verification
3657 bridge = getattr(self.op, "bridge", None)
3659 self.op.bridge = self.cfg.GetDefBridge()
3661 self.op.bridge = bridge
3663 # boot order verification
3664 if self.op.hvm_boot_order is not None:
3665 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3666 raise errors.OpPrereqError("invalid boot order specified,"
3667 " must be one or more of [acdn]")
3670 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3671 raise errors.OpPrereqError("One and only one of iallocator and primary"
3672 " node must be given")
3674 if self.op.iallocator is not None:
3675 self._RunAllocator()
3677 #### node related checks
3679 # check primary node
3680 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3682 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3684 self.op.pnode = pnode.name
3686 self.secondaries = []
3688 # mirror node verification
3689 if self.op.disk_template in constants.DTS_NET_MIRROR:
3690 if getattr(self.op, "snode", None) is None:
3691 raise errors.OpPrereqError("The networked disk templates need"
3694 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3695 if snode_name is None:
3696 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3698 elif snode_name == pnode.name:
3699 raise errors.OpPrereqError("The secondary node cannot be"
3700 " the primary node.")
3701 self.secondaries.append(snode_name)
3703 req_size = _ComputeDiskSize(self.op.disk_template,
3704 self.op.disk_size, self.op.swap_size)
3706 # Check lv size requirements
3707 if req_size is not None:
3708 nodenames = [pnode.name] + self.secondaries
3709 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3710 for node in nodenames:
3711 info = nodeinfo.get(node, None)
3713 raise errors.OpPrereqError("Cannot get current information"
3714 " from node '%s'" % node)
3715 vg_free = info.get('vg_free', None)
3716 if not isinstance(vg_free, int):
3717 raise errors.OpPrereqError("Can't compute free disk space on"
3719 if req_size > info['vg_free']:
3720 raise errors.OpPrereqError("Not enough disk space on target node %s."
3721 " %d MB available, %d MB required" %
3722 (node, info['vg_free'], req_size))
3725 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3727 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3728 " primary node" % self.op.os_type)
3730 if self.op.kernel_path == constants.VALUE_NONE:
3731 raise errors.OpPrereqError("Can't set instance kernel to none")
3734 # bridge check on primary node
3735 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3736 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3737 " destination node '%s'" %
3738 (self.op.bridge, pnode.name))
3740 # memory check on primary node
3742 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3743 "creating instance %s" % self.op.instance_name,
3746 # hvm_cdrom_image_path verification
3747 if self.op.hvm_cdrom_image_path is not None:
3748 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3749 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3750 " be an absolute path or None, not %s" %
3751 self.op.hvm_cdrom_image_path)
3752 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3753 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3754 " regular file or a symlink pointing to"
3755 " an existing regular file, not %s" %
3756 self.op.hvm_cdrom_image_path)
3758 # vnc_bind_address verification
3759 if self.op.vnc_bind_address is not None:
3760 if not utils.IsValidIP(self.op.vnc_bind_address):
3761 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3762 " like a valid IP address" %
3763 self.op.vnc_bind_address)
3765 # Xen HVM device type checks
3766 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3767 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3768 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3769 " hypervisor" % self.op.hvm_nic_type)
3770 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3771 raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3772 " hypervisor" % self.op.hvm_disk_type)
3775 self.instance_status = 'up'
3777 self.instance_status = 'down'
3779 def Exec(self, feedback_fn):
3780 """Create and add the instance to the cluster.
3783 instance = self.op.instance_name
3784 pnode_name = self.pnode.name
3786 if self.op.mac == "auto":
3787 mac_address = self.cfg.GenerateMAC()
3789 mac_address = self.op.mac
3791 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3792 if self.inst_ip is not None:
3793 nic.ip = self.inst_ip
3795 ht_kind = self.sstore.GetHypervisorType()
3796 if ht_kind in constants.HTS_REQ_PORT:
3797 network_port = self.cfg.AllocatePort()
3801 if self.op.vnc_bind_address is None:
3802 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3804 disks = _GenerateDiskTemplate(self.cfg,
3805 self.op.disk_template,
3806 instance, pnode_name,
3807 self.secondaries, self.op.disk_size,
3810 iobj = objects.Instance(name=instance, os=self.op.os_type,
3811 primary_node=pnode_name,
3812 memory=self.op.mem_size,
3813 vcpus=self.op.vcpus,
3814 nics=[nic], disks=disks,
3815 disk_template=self.op.disk_template,
3816 status=self.instance_status,
3817 network_port=network_port,
3818 kernel_path=self.op.kernel_path,
3819 initrd_path=self.op.initrd_path,
3820 hvm_boot_order=self.op.hvm_boot_order,
3821 hvm_acpi=self.op.hvm_acpi,
3822 hvm_pae=self.op.hvm_pae,
3823 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3824 vnc_bind_address=self.op.vnc_bind_address,
3825 hvm_nic_type=self.op.hvm_nic_type,
3826 hvm_disk_type=self.op.hvm_disk_type,
3829 feedback_fn("* creating instance disks...")
3830 if not _CreateDisks(self.cfg, iobj):
3831 _RemoveDisks(iobj, self.cfg)
3832 raise errors.OpExecError("Device creation failed, reverting...")
3834 feedback_fn("adding instance %s to cluster config" % instance)
3836 self.cfg.AddInstance(iobj)
3838 if self.op.wait_for_sync:
3839 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3840 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3841 # make sure the disks are not degraded (still sync-ing is ok)
3843 feedback_fn("* checking mirrors status")
3844 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3849 _RemoveDisks(iobj, self.cfg)
3850 self.cfg.RemoveInstance(iobj.name)
3851 raise errors.OpExecError("There are some degraded disks for"
3854 feedback_fn("creating os for instance %s on node %s" %
3855 (instance, pnode_name))
3857 if iobj.disk_template != constants.DT_DISKLESS:
3858 if self.op.mode == constants.INSTANCE_CREATE:
3859 feedback_fn("* running the instance OS create scripts...")
3860 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3861 raise errors.OpExecError("could not add os for instance %s"
3863 (instance, pnode_name))
3865 elif self.op.mode == constants.INSTANCE_IMPORT:
3866 feedback_fn("* running the instance OS import scripts...")
3867 src_node = self.op.src_node
3868 src_image = self.src_image
3869 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3870 src_node, src_image):
3871 raise errors.OpExecError("Could not import os for instance"
3873 (instance, pnode_name))
3875 # also checked in the prereq part
3876 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3880 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3881 feedback_fn("* starting instance...")
3882 if not rpc.call_instance_start(pnode_name, iobj, None):
3883 raise errors.OpExecError("Could not start instance")
3886 class LUConnectConsole(NoHooksLU):
3887 """Connect to an instance's console.
3889 This is somewhat special in that it returns the command line that
3890 you need to run on the master node in order to connect to the
3894 _OP_REQP = ["instance_name"]
3896 def CheckPrereq(self):
3897 """Check prerequisites.
3899 This checks that the instance is in the cluster.
3902 instance = self.cfg.GetInstanceInfo(
3903 self.cfg.ExpandInstanceName(self.op.instance_name))
3904 if instance is None:
3905 raise errors.OpPrereqError("Instance '%s' not known" %
3906 self.op.instance_name)
3907 self.instance = instance
3909 def Exec(self, feedback_fn):
3910 """Connect to the console of an instance
3913 instance = self.instance
3914 node = instance.primary_node
3916 node_insts = rpc.call_instance_list([node])[node]
3917 if node_insts is False:
3918 raise errors.OpExecError("Can't connect to node %s." % node)
3920 if instance.name not in node_insts:
3921 raise errors.OpExecError("Instance %s is not running." % instance.name)
3923 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3925 hyper = hypervisor.GetHypervisor()
3926 console_cmd = hyper.GetShellCommandForConsole(instance)
3928 argv = ["ssh", "-q", "-t"]
3929 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3930 argv.extend(ssh.BATCH_MODE_OPTS)
3932 argv.append(console_cmd)
3936 class LUAddMDDRBDComponent(LogicalUnit):
3937 """Adda new mirror member to an instance's disk.
3940 HPATH = "mirror-add"
3941 HTYPE = constants.HTYPE_INSTANCE
3942 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3944 def BuildHooksEnv(self):
3947 This runs on the master, the primary and all the secondaries.
3951 "NEW_SECONDARY": self.op.remote_node,
3952 "DISK_NAME": self.op.disk_name,
3954 env.update(_BuildInstanceHookEnvByObject(self.instance))
3955 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3956 self.op.remote_node,] + list(self.instance.secondary_nodes)
3959 def CheckPrereq(self):
3960 """Check prerequisites.
3962 This checks that the instance is in the cluster.
3965 instance = self.cfg.GetInstanceInfo(
3966 self.cfg.ExpandInstanceName(self.op.instance_name))
3967 if instance is None:
3968 raise errors.OpPrereqError("Instance '%s' not known" %
3969 self.op.instance_name)
3970 self.instance = instance
3972 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3973 if remote_node is None:
3974 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3975 self.remote_node = remote_node
3977 if remote_node == instance.primary_node:
3978 raise errors.OpPrereqError("The specified node is the primary node of"
3981 if instance.disk_template != constants.DT_REMOTE_RAID1:
3982 raise errors.OpPrereqError("Instance's disk layout is not"
3984 for disk in instance.disks:
3985 if disk.iv_name == self.op.disk_name:
3988 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3989 " instance." % self.op.disk_name)
3990 if len(disk.children) > 1:
3991 raise errors.OpPrereqError("The device already has two slave devices."
3992 " This would create a 3-disk raid1 which we"
3996 def Exec(self, feedback_fn):
3997 """Add the mirror component
4001 instance = self.instance
4003 remote_node = self.remote_node
4004 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
4005 names = _GenerateUniqueNames(self.cfg, lv_names)
4006 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
4007 remote_node, disk.size, names)
4009 logger.Info("adding new mirror component on secondary")
4011 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
4013 _GetInstanceInfoText(instance)):
4014 raise errors.OpExecError("Failed to create new component on secondary"
4015 " node %s" % remote_node)
4017 logger.Info("adding new mirror component on primary")
4019 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
4021 _GetInstanceInfoText(instance)):
4022 # remove secondary dev
4023 self.cfg.SetDiskID(new_drbd, remote_node)
4024 rpc.call_blockdev_remove(remote_node, new_drbd)
4025 raise errors.OpExecError("Failed to create volume on primary")
4027 # the device exists now
4028 # call the primary node to add the mirror to md
4029 logger.Info("adding new mirror component to md")
4030 if not rpc.call_blockdev_addchildren(instance.primary_node,
4032 logger.Error("Can't add mirror compoment to md!")
4033 self.cfg.SetDiskID(new_drbd, remote_node)
4034 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4035 logger.Error("Can't rollback on secondary")
4036 self.cfg.SetDiskID(new_drbd, instance.primary_node)
4037 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4038 logger.Error("Can't rollback on primary")
4039 raise errors.OpExecError("Can't add mirror component to md array")
4041 disk.children.append(new_drbd)
4043 self.cfg.AddInstance(instance)
4045 _WaitForSync(self.cfg, instance, self.proc)
4050 class LURemoveMDDRBDComponent(LogicalUnit):
4051 """Remove a component from a remote_raid1 disk.
4054 HPATH = "mirror-remove"
4055 HTYPE = constants.HTYPE_INSTANCE
4056 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
4058 def BuildHooksEnv(self):
4061 This runs on the master, the primary and all the secondaries.
4065 "DISK_NAME": self.op.disk_name,
4066 "DISK_ID": self.op.disk_id,
4067 "OLD_SECONDARY": self.old_secondary,
4069 env.update(_BuildInstanceHookEnvByObject(self.instance))
4070 nl = [self.sstore.GetMasterNode(),
4071 self.instance.primary_node] + list(self.instance.secondary_nodes)
4074 def CheckPrereq(self):
4075 """Check prerequisites.
4077 This checks that the instance is in the cluster.
4080 instance = self.cfg.GetInstanceInfo(
4081 self.cfg.ExpandInstanceName(self.op.instance_name))
4082 if instance is None:
4083 raise errors.OpPrereqError("Instance '%s' not known" %
4084 self.op.instance_name)
4085 self.instance = instance
4087 if instance.disk_template != constants.DT_REMOTE_RAID1:
4088 raise errors.OpPrereqError("Instance's disk layout is not"
4090 for disk in instance.disks:
4091 if disk.iv_name == self.op.disk_name:
4094 raise errors.OpPrereqError("Can't find this device ('%s') in the"
4095 " instance." % self.op.disk_name)
4096 for child in disk.children:
4097 if (child.dev_type == constants.LD_DRBD7 and
4098 child.logical_id[2] == self.op.disk_id):
4101 raise errors.OpPrereqError("Can't find the device with this port.")
4103 if len(disk.children) < 2:
4104 raise errors.OpPrereqError("Cannot remove the last component from"
4108 if self.child.logical_id[0] == instance.primary_node:
4112 self.old_secondary = self.child.logical_id[oid]
4114 def Exec(self, feedback_fn):
4115 """Remove the mirror component
4118 instance = self.instance
4121 logger.Info("remove mirror component")
4122 self.cfg.SetDiskID(disk, instance.primary_node)
4123 if not rpc.call_blockdev_removechildren(instance.primary_node,
4125 raise errors.OpExecError("Can't remove child from mirror.")
4127 for node in child.logical_id[:2]:
4128 self.cfg.SetDiskID(child, node)
4129 if not rpc.call_blockdev_remove(node, child):
4130 logger.Error("Warning: failed to remove device from node %s,"
4131 " continuing operation." % node)
4133 disk.children.remove(child)
4134 self.cfg.AddInstance(instance)
4137 class LUReplaceDisks(LogicalUnit):
4138 """Replace the disks of an instance.
4141 HPATH = "mirrors-replace"
4142 HTYPE = constants.HTYPE_INSTANCE
4143 _OP_REQP = ["instance_name", "mode", "disks"]
4145 def _RunAllocator(self):
4146 """Compute a new secondary node using an IAllocator.
4149 ial = IAllocator(self.cfg, self.sstore,
4150 mode=constants.IALLOCATOR_MODE_RELOC,
4151 name=self.op.instance_name,
4152 relocate_from=[self.sec_node])
4154 ial.Run(self.op.iallocator)
4157 raise errors.OpPrereqError("Can't compute nodes using"
4158 " iallocator '%s': %s" % (self.op.iallocator,
4160 if len(ial.nodes) != ial.required_nodes:
4161 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4162 " of nodes (%s), required %s" %
4163 (len(ial.nodes), ial.required_nodes))
4164 self.op.remote_node = ial.nodes[0]
4165 logger.ToStdout("Selected new secondary for the instance: %s" %
4166 self.op.remote_node)
4168 def BuildHooksEnv(self):
4171 This runs on the master, the primary and all the secondaries.
4175 "MODE": self.op.mode,
4176 "NEW_SECONDARY": self.op.remote_node,
4177 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4179 env.update(_BuildInstanceHookEnvByObject(self.instance))
4181 self.sstore.GetMasterNode(),
4182 self.instance.primary_node,
4184 if self.op.remote_node is not None:
4185 nl.append(self.op.remote_node)
4188 def CheckPrereq(self):
4189 """Check prerequisites.
4191 This checks that the instance is in the cluster.
4194 if not hasattr(self.op, "remote_node"):
4195 self.op.remote_node = None
4197 instance = self.cfg.GetInstanceInfo(
4198 self.cfg.ExpandInstanceName(self.op.instance_name))
4199 if instance is None:
4200 raise errors.OpPrereqError("Instance '%s' not known" %
4201 self.op.instance_name)
4202 self.instance = instance
4203 self.op.instance_name = instance.name
4205 if instance.disk_template not in constants.DTS_NET_MIRROR:
4206 raise errors.OpPrereqError("Instance's disk layout is not"
4207 " network mirrored.")
4209 if len(instance.secondary_nodes) != 1:
4210 raise errors.OpPrereqError("The instance has a strange layout,"
4211 " expected one secondary but found %d" %
4212 len(instance.secondary_nodes))
4214 self.sec_node = instance.secondary_nodes[0]
4216 ia_name = getattr(self.op, "iallocator", None)
4217 if ia_name is not None:
4218 if self.op.remote_node is not None:
4219 raise errors.OpPrereqError("Give either the iallocator or the new"
4220 " secondary, not both")
4221 self._RunAllocator()
4223 remote_node = self.op.remote_node
4224 if remote_node is not None:
4225 remote_node = self.cfg.ExpandNodeName(remote_node)
4226 if remote_node is None:
4227 raise errors.OpPrereqError("Node '%s' not known" %
4228 self.op.remote_node)
4229 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4231 self.remote_node_info = None
4232 if remote_node == instance.primary_node:
4233 raise errors.OpPrereqError("The specified node is the primary node of"
4235 elif remote_node == self.sec_node:
4236 if self.op.mode == constants.REPLACE_DISK_SEC:
4237 # this is for DRBD8, where we can't execute the same mode of
4238 # replacement as for drbd7 (no different port allocated)
4239 raise errors.OpPrereqError("Same secondary given, cannot execute"
4241 # the user gave the current secondary, switch to
4242 # 'no-replace-secondary' mode for drbd7
4244 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4245 self.op.mode != constants.REPLACE_DISK_ALL):
4246 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4247 " disks replacement, not individual ones")
4248 if instance.disk_template == constants.DT_DRBD8:
4249 if (self.op.mode == constants.REPLACE_DISK_ALL and
4250 remote_node is not None):
4251 # switch to replace secondary mode
4252 self.op.mode = constants.REPLACE_DISK_SEC
4254 if self.op.mode == constants.REPLACE_DISK_ALL:
4255 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4256 " secondary disk replacement, not"
4258 elif self.op.mode == constants.REPLACE_DISK_PRI:
4259 if remote_node is not None:
4260 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4261 " the secondary while doing a primary"
4262 " node disk replacement")
4263 self.tgt_node = instance.primary_node
4264 self.oth_node = instance.secondary_nodes[0]
4265 elif self.op.mode == constants.REPLACE_DISK_SEC:
4266 self.new_node = remote_node # this can be None, in which case
4267 # we don't change the secondary
4268 self.tgt_node = instance.secondary_nodes[0]
4269 self.oth_node = instance.primary_node
4271 raise errors.ProgrammerError("Unhandled disk replace mode")
4273 for name in self.op.disks:
4274 if instance.FindDisk(name) is None:
4275 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4276 (name, instance.name))
4277 self.op.remote_node = remote_node
4279 def _ExecRR1(self, feedback_fn):
4280 """Replace the disks of an instance.
4283 instance = self.instance
4286 if self.op.remote_node is None:
4287 remote_node = self.sec_node
4289 remote_node = self.op.remote_node
4291 for dev in instance.disks:
4293 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4294 names = _GenerateUniqueNames(cfg, lv_names)
4295 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4296 remote_node, size, names)
4297 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4298 logger.Info("adding new mirror component on secondary for %s" %
4301 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4303 _GetInstanceInfoText(instance)):
4304 raise errors.OpExecError("Failed to create new component on secondary"
4305 " node %s. Full abort, cleanup manually!" %
4308 logger.Info("adding new mirror component on primary")
4310 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4312 _GetInstanceInfoText(instance)):
4313 # remove secondary dev
4314 cfg.SetDiskID(new_drbd, remote_node)
4315 rpc.call_blockdev_remove(remote_node, new_drbd)
4316 raise errors.OpExecError("Failed to create volume on primary!"
4317 " Full abort, cleanup manually!!")
4319 # the device exists now
4320 # call the primary node to add the mirror to md
4321 logger.Info("adding new mirror component to md")
4322 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4324 logger.Error("Can't add mirror compoment to md!")
4325 cfg.SetDiskID(new_drbd, remote_node)
4326 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4327 logger.Error("Can't rollback on secondary")
4328 cfg.SetDiskID(new_drbd, instance.primary_node)
4329 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4330 logger.Error("Can't rollback on primary")
4331 raise errors.OpExecError("Full abort, cleanup manually!!")
4333 dev.children.append(new_drbd)
4334 cfg.AddInstance(instance)
4336 # this can fail as the old devices are degraded and _WaitForSync
4337 # does a combined result over all disks, so we don't check its
4339 _WaitForSync(cfg, instance, self.proc, unlock=True)
4341 # so check manually all the devices
4342 for name in iv_names:
4343 dev, child, new_drbd = iv_names[name]
4344 cfg.SetDiskID(dev, instance.primary_node)
4345 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4347 raise errors.OpExecError("MD device %s is degraded!" % name)
4348 cfg.SetDiskID(new_drbd, instance.primary_node)
4349 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4351 raise errors.OpExecError("New drbd device %s is degraded!" % name)
4353 for name in iv_names:
4354 dev, child, new_drbd = iv_names[name]
4355 logger.Info("remove mirror %s component" % name)
4356 cfg.SetDiskID(dev, instance.primary_node)
4357 if not rpc.call_blockdev_removechildren(instance.primary_node,
4359 logger.Error("Can't remove child from mirror, aborting"
4360 " *this device cleanup*.\nYou need to cleanup manually!!")
4363 for node in child.logical_id[:2]:
4364 logger.Info("remove child device on %s" % node)
4365 cfg.SetDiskID(child, node)
4366 if not rpc.call_blockdev_remove(node, child):
4367 logger.Error("Warning: failed to remove device from node %s,"
4368 " continuing operation." % node)
4370 dev.children.remove(child)
4372 cfg.AddInstance(instance)
4374 def _ExecD8DiskOnly(self, feedback_fn):
4375 """Replace a disk on the primary or secondary for dbrd8.
4377 The algorithm for replace is quite complicated:
4378 - for each disk to be replaced:
4379 - create new LVs on the target node with unique names
4380 - detach old LVs from the drbd device
4381 - rename old LVs to name_replaced.<time_t>
4382 - rename new LVs to old LVs
4383 - attach the new LVs (with the old names now) to the drbd device
4384 - wait for sync across all devices
4385 - for each modified disk:
4386 - remove old LVs (which have the name name_replaces.<time_t>)
4388 Failures are not very well handled.
4392 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4393 instance = self.instance
4395 vgname = self.cfg.GetVGName()
4398 tgt_node = self.tgt_node
4399 oth_node = self.oth_node
4401 # Step: check device activation
4402 self.proc.LogStep(1, steps_total, "check device existence")
4403 info("checking volume groups")
4404 my_vg = cfg.GetVGName()
4405 results = rpc.call_vg_list([oth_node, tgt_node])
4407 raise errors.OpExecError("Can't list volume groups on the nodes")
4408 for node in oth_node, tgt_node:
4409 res = results.get(node, False)
4410 if not res or my_vg not in res:
4411 raise errors.OpExecError("Volume group '%s' not found on %s" %
4413 for dev in instance.disks:
4414 if not dev.iv_name in self.op.disks:
4416 for node in tgt_node, oth_node:
4417 info("checking %s on %s" % (dev.iv_name, node))
4418 cfg.SetDiskID(dev, node)
4419 if not rpc.call_blockdev_find(node, dev):
4420 raise errors.OpExecError("Can't find device %s on node %s" %
4421 (dev.iv_name, node))
4423 # Step: check other node consistency
4424 self.proc.LogStep(2, steps_total, "check peer consistency")
4425 for dev in instance.disks:
4426 if not dev.iv_name in self.op.disks:
4428 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4429 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4430 oth_node==instance.primary_node):
4431 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4432 " to replace disks on this node (%s)" %
4433 (oth_node, tgt_node))
4435 # Step: create new storage
4436 self.proc.LogStep(3, steps_total, "allocate new storage")
4437 for dev in instance.disks:
4438 if not dev.iv_name in self.op.disks:
4441 cfg.SetDiskID(dev, tgt_node)
4442 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4443 names = _GenerateUniqueNames(cfg, lv_names)
4444 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4445 logical_id=(vgname, names[0]))
4446 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4447 logical_id=(vgname, names[1]))
4448 new_lvs = [lv_data, lv_meta]
4449 old_lvs = dev.children
4450 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4451 info("creating new local storage on %s for %s" %
4452 (tgt_node, dev.iv_name))
4453 # since we *always* want to create this LV, we use the
4454 # _Create...OnPrimary (which forces the creation), even if we
4455 # are talking about the secondary node
4456 for new_lv in new_lvs:
4457 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4458 _GetInstanceInfoText(instance)):
4459 raise errors.OpExecError("Failed to create new LV named '%s' on"
4461 (new_lv.logical_id[1], tgt_node))
4463 # Step: for each lv, detach+rename*2+attach
4464 self.proc.LogStep(4, steps_total, "change drbd configuration")
4465 for dev, old_lvs, new_lvs in iv_names.itervalues():
4466 info("detaching %s drbd from local storage" % dev.iv_name)
4467 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4468 raise errors.OpExecError("Can't detach drbd from local storage on node"
4469 " %s for device %s" % (tgt_node, dev.iv_name))
4471 #cfg.Update(instance)
4473 # ok, we created the new LVs, so now we know we have the needed
4474 # storage; as such, we proceed on the target node to rename
4475 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4476 # using the assumption that logical_id == physical_id (which in
4477 # turn is the unique_id on that node)
4479 # FIXME(iustin): use a better name for the replaced LVs
4480 temp_suffix = int(time.time())
4481 ren_fn = lambda d, suff: (d.physical_id[0],
4482 d.physical_id[1] + "_replaced-%s" % suff)
4483 # build the rename list based on what LVs exist on the node
4485 for to_ren in old_lvs:
4486 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4487 if find_res is not None: # device exists
4488 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4490 info("renaming the old LVs on the target node")
4491 if not rpc.call_blockdev_rename(tgt_node, rlist):
4492 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4493 # now we rename the new LVs to the old LVs
4494 info("renaming the new LVs on the target node")
4495 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4496 if not rpc.call_blockdev_rename(tgt_node, rlist):
4497 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4499 for old, new in zip(old_lvs, new_lvs):
4500 new.logical_id = old.logical_id
4501 cfg.SetDiskID(new, tgt_node)
4503 for disk in old_lvs:
4504 disk.logical_id = ren_fn(disk, temp_suffix)
4505 cfg.SetDiskID(disk, tgt_node)
4507 # now that the new lvs have the old name, we can add them to the device
4508 info("adding new mirror component on %s" % tgt_node)
4509 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4510 for new_lv in new_lvs:
4511 if not rpc.call_blockdev_remove(tgt_node, new_lv):
4512 warning("Can't rollback device %s", hint="manually cleanup unused"
4514 raise errors.OpExecError("Can't add local storage to drbd")
4516 dev.children = new_lvs
4517 cfg.Update(instance)
4519 # Step: wait for sync
4521 # this can fail as the old devices are degraded and _WaitForSync
4522 # does a combined result over all disks, so we don't check its
4524 self.proc.LogStep(5, steps_total, "sync devices")
4525 _WaitForSync(cfg, instance, self.proc, unlock=True)
4527 # so check manually all the devices
4528 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4529 cfg.SetDiskID(dev, instance.primary_node)
4530 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4532 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4534 # Step: remove old storage
4535 self.proc.LogStep(6, steps_total, "removing old storage")
4536 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4537 info("remove logical volumes for %s" % name)
4539 cfg.SetDiskID(lv, tgt_node)
4540 if not rpc.call_blockdev_remove(tgt_node, lv):
4541 warning("Can't remove old LV", hint="manually remove unused LVs")
4544 def _ExecD8Secondary(self, feedback_fn):
4545 """Replace the secondary node for drbd8.
4547 The algorithm for replace is quite complicated:
4548 - for all disks of the instance:
4549 - create new LVs on the new node with same names
4550 - shutdown the drbd device on the old secondary
4551 - disconnect the drbd network on the primary
4552 - create the drbd device on the new secondary
4553 - network attach the drbd on the primary, using an artifice:
4554 the drbd code for Attach() will connect to the network if it
4555 finds a device which is connected to the good local disks but
4557 - wait for sync across all devices
4558 - remove all disks from the old secondary
4560 Failures are not very well handled.
4564 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4565 instance = self.instance
4567 vgname = self.cfg.GetVGName()
4570 old_node = self.tgt_node
4571 new_node = self.new_node
4572 pri_node = instance.primary_node
4574 # Step: check device activation
4575 self.proc.LogStep(1, steps_total, "check device existence")
4576 info("checking volume groups")
4577 my_vg = cfg.GetVGName()
4578 results = rpc.call_vg_list([pri_node, new_node])
4580 raise errors.OpExecError("Can't list volume groups on the nodes")
4581 for node in pri_node, new_node:
4582 res = results.get(node, False)
4583 if not res or my_vg not in res:
4584 raise errors.OpExecError("Volume group '%s' not found on %s" %
4586 for dev in instance.disks:
4587 if not dev.iv_name in self.op.disks:
4589 info("checking %s on %s" % (dev.iv_name, pri_node))
4590 cfg.SetDiskID(dev, pri_node)
4591 if not rpc.call_blockdev_find(pri_node, dev):
4592 raise errors.OpExecError("Can't find device %s on node %s" %
4593 (dev.iv_name, pri_node))
4595 # Step: check other node consistency
4596 self.proc.LogStep(2, steps_total, "check peer consistency")
4597 for dev in instance.disks:
4598 if not dev.iv_name in self.op.disks:
4600 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4601 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4602 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4603 " unsafe to replace the secondary" %
4606 # Step: create new storage
4607 self.proc.LogStep(3, steps_total, "allocate new storage")
4608 for dev in instance.disks:
4610 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4611 # since we *always* want to create this LV, we use the
4612 # _Create...OnPrimary (which forces the creation), even if we
4613 # are talking about the secondary node
4614 for new_lv in dev.children:
4615 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4616 _GetInstanceInfoText(instance)):
4617 raise errors.OpExecError("Failed to create new LV named '%s' on"
4619 (new_lv.logical_id[1], new_node))
4621 iv_names[dev.iv_name] = (dev, dev.children)
4623 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4624 for dev in instance.disks:
4626 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4627 # create new devices on new_node
4628 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4629 logical_id=(pri_node, new_node,
4631 children=dev.children)
4632 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4634 _GetInstanceInfoText(instance)):
4635 raise errors.OpExecError("Failed to create new DRBD on"
4636 " node '%s'" % new_node)
4638 for dev in instance.disks:
4639 # we have new devices, shutdown the drbd on the old secondary
4640 info("shutting down drbd for %s on old node" % dev.iv_name)
4641 cfg.SetDiskID(dev, old_node)
4642 if not rpc.call_blockdev_shutdown(old_node, dev):
4643 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4644 hint="Please cleanup this device manually as soon as possible")
4646 info("detaching primary drbds from the network (=> standalone)")
4648 for dev in instance.disks:
4649 cfg.SetDiskID(dev, pri_node)
4650 # set the physical (unique in bdev terms) id to None, meaning
4651 # detach from network
4652 dev.physical_id = (None,) * len(dev.physical_id)
4653 # and 'find' the device, which will 'fix' it to match the
4655 if rpc.call_blockdev_find(pri_node, dev):
4658 warning("Failed to detach drbd %s from network, unusual case" %
4662 # no detaches succeeded (very unlikely)
4663 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4665 # if we managed to detach at least one, we update all the disks of
4666 # the instance to point to the new secondary
4667 info("updating instance configuration")
4668 for dev in instance.disks:
4669 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4670 cfg.SetDiskID(dev, pri_node)
4671 cfg.Update(instance)
4673 # and now perform the drbd attach
4674 info("attaching primary drbds to new secondary (standalone => connected)")
4676 for dev in instance.disks:
4677 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4678 # since the attach is smart, it's enough to 'find' the device,
4679 # it will automatically activate the network, if the physical_id
4681 cfg.SetDiskID(dev, pri_node)
4682 if not rpc.call_blockdev_find(pri_node, dev):
4683 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4684 "please do a gnt-instance info to see the status of disks")
4686 # this can fail as the old devices are degraded and _WaitForSync
4687 # does a combined result over all disks, so we don't check its
4689 self.proc.LogStep(5, steps_total, "sync devices")
4690 _WaitForSync(cfg, instance, self.proc, unlock=True)
4692 # so check manually all the devices
4693 for name, (dev, old_lvs) in iv_names.iteritems():
4694 cfg.SetDiskID(dev, pri_node)
4695 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4697 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4699 self.proc.LogStep(6, steps_total, "removing old storage")
4700 for name, (dev, old_lvs) in iv_names.iteritems():
4701 info("remove logical volumes for %s" % name)
4703 cfg.SetDiskID(lv, old_node)
4704 if not rpc.call_blockdev_remove(old_node, lv):
4705 warning("Can't remove LV on old secondary",
4706 hint="Cleanup stale volumes by hand")
4708 def Exec(self, feedback_fn):
4709 """Execute disk replacement.
4711 This dispatches the disk replacement to the appropriate handler.
4714 instance = self.instance
4716 # Activate the instance disks if we're replacing them on a down instance
4717 if instance.status == "down":
4718 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4719 self.proc.ChainOpCode(op)
4721 if instance.disk_template == constants.DT_REMOTE_RAID1:
4723 elif instance.disk_template == constants.DT_DRBD8:
4724 if self.op.remote_node is None:
4725 fn = self._ExecD8DiskOnly
4727 fn = self._ExecD8Secondary
4729 raise errors.ProgrammerError("Unhandled disk replacement case")
4731 ret = fn(feedback_fn)
4733 # Deactivate the instance disks if we're replacing them on a down instance
4734 if instance.status == "down":
4735 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4736 self.proc.ChainOpCode(op)
4741 class LUGrowDisk(LogicalUnit):
4742 """Grow a disk of an instance.
4746 HTYPE = constants.HTYPE_INSTANCE
4747 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4749 def BuildHooksEnv(self):
4752 This runs on the master, the primary and all the secondaries.
4756 "DISK": self.op.disk,
4757 "AMOUNT": self.op.amount,
4759 env.update(_BuildInstanceHookEnvByObject(self.instance))
4761 self.sstore.GetMasterNode(),
4762 self.instance.primary_node,
4766 def CheckPrereq(self):
4767 """Check prerequisites.
4769 This checks that the instance is in the cluster.
4772 instance = self.cfg.GetInstanceInfo(
4773 self.cfg.ExpandInstanceName(self.op.instance_name))
4774 if instance is None:
4775 raise errors.OpPrereqError("Instance '%s' not known" %
4776 self.op.instance_name)
4778 if self.op.amount <= 0:
4779 raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4781 self.instance = instance
4782 self.op.instance_name = instance.name
4784 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4785 raise errors.OpPrereqError("Instance's disk layout does not support"
4788 self.disk = instance.FindDisk(self.op.disk)
4789 if self.disk is None:
4790 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4791 (self.op.disk, instance.name))
4793 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4794 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4795 for node in nodenames:
4796 info = nodeinfo.get(node, None)
4798 raise errors.OpPrereqError("Cannot get current information"
4799 " from node '%s'" % node)
4800 vg_free = info.get('vg_free', None)
4801 if not isinstance(vg_free, int):
4802 raise errors.OpPrereqError("Can't compute free disk space on"
4804 if self.op.amount > info['vg_free']:
4805 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4806 " %d MiB available, %d MiB required" %
4807 (node, info['vg_free'], self.op.amount))
4808 is_primary = (node == instance.primary_node)
4809 if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4810 raise errors.OpPrereqError("Disk %s is degraded or not fully"
4811 " synchronized on node %s,"
4812 " aborting grow." % (self.op.disk, node))
4814 def Exec(self, feedback_fn):
4815 """Execute disk grow.
4818 instance = self.instance
4820 for node in (instance.secondary_nodes + (instance.primary_node,)):
4821 self.cfg.SetDiskID(disk, node)
4822 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4823 if not result or not isinstance(result, tuple) or len(result) != 2:
4824 raise errors.OpExecError("grow request failed to node %s" % node)
4826 raise errors.OpExecError("grow request failed to node %s: %s" %
4828 disk.RecordGrow(self.op.amount)
4829 self.cfg.Update(instance)
4830 if self.op.wait_for_sync:
4831 disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4833 logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4834 " Please check the instance.")
4837 class LUQueryInstanceData(NoHooksLU):
4838 """Query runtime instance data.
4841 _OP_REQP = ["instances"]
4843 def CheckPrereq(self):
4844 """Check prerequisites.
4846 This only checks the optional instance list against the existing names.
4849 if not isinstance(self.op.instances, list):
4850 raise errors.OpPrereqError("Invalid argument type 'instances'")
4851 if self.op.instances:
4852 self.wanted_instances = []
4853 names = self.op.instances
4855 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4856 if instance is None:
4857 raise errors.OpPrereqError("No such instance name '%s'" % name)
4858 self.wanted_instances.append(instance)
4860 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4861 in self.cfg.GetInstanceList()]
4865 def _ComputeDiskStatus(self, instance, snode, dev):
4866 """Compute block device status.
4869 self.cfg.SetDiskID(dev, instance.primary_node)
4870 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4871 if dev.dev_type in constants.LDS_DRBD:
4872 # we change the snode then (otherwise we use the one passed in)
4873 if dev.logical_id[0] == instance.primary_node:
4874 snode = dev.logical_id[1]
4876 snode = dev.logical_id[0]
4879 self.cfg.SetDiskID(dev, snode)
4880 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4885 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4886 for child in dev.children]
4891 "iv_name": dev.iv_name,
4892 "dev_type": dev.dev_type,
4893 "logical_id": dev.logical_id,
4894 "physical_id": dev.physical_id,
4895 "pstatus": dev_pstatus,
4896 "sstatus": dev_sstatus,
4897 "children": dev_children,
4902 def Exec(self, feedback_fn):
4903 """Gather and return data"""
4905 for instance in self.wanted_instances:
4906 remote_info = rpc.call_instance_info(instance.primary_node,
4908 if remote_info and "state" in remote_info:
4911 remote_state = "down"
4912 if instance.status == "down":
4913 config_state = "down"
4917 disks = [self._ComputeDiskStatus(instance, None, device)
4918 for device in instance.disks]
4921 "name": instance.name,
4922 "config_state": config_state,
4923 "run_state": remote_state,
4924 "pnode": instance.primary_node,
4925 "snodes": instance.secondary_nodes,
4927 "memory": instance.memory,
4928 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4930 "vcpus": instance.vcpus,
4933 htkind = self.sstore.GetHypervisorType()
4934 if htkind == constants.HT_XEN_PVM30:
4935 idict["kernel_path"] = instance.kernel_path
4936 idict["initrd_path"] = instance.initrd_path
4938 if htkind == constants.HT_XEN_HVM31:
4939 idict["hvm_boot_order"] = instance.hvm_boot_order
4940 idict["hvm_acpi"] = instance.hvm_acpi
4941 idict["hvm_pae"] = instance.hvm_pae
4942 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4943 idict["hvm_nic_type"] = instance.hvm_nic_type
4944 idict["hvm_disk_type"] = instance.hvm_disk_type
4946 if htkind in constants.HTS_REQ_PORT:
4947 if instance.network_port is None:
4948 vnc_console_port = None
4949 elif instance.vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4950 vnc_console_port = "%s:%s" % (instance.primary_node,
4951 instance.network_port)
4952 elif instance.vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4953 vnc_console_port = "%s:%s on node %s" % (instance.vnc_bind_address,
4954 instance.network_port,
4955 instance.primary_node)
4957 vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4958 instance.network_port)
4959 idict["vnc_console_port"] = vnc_console_port
4960 idict["vnc_bind_address"] = instance.vnc_bind_address
4961 idict["network_port"] = instance.network_port
4963 result[instance.name] = idict
4968 class LUSetInstanceParms(LogicalUnit):
4969 """Modifies an instances's parameters.
4972 HPATH = "instance-modify"
4973 HTYPE = constants.HTYPE_INSTANCE
4974 _OP_REQP = ["instance_name"]
4976 def BuildHooksEnv(self):
4979 This runs on the master, primary and secondaries.
4984 args['memory'] = self.mem
4986 args['vcpus'] = self.vcpus
4987 if self.do_ip or self.do_bridge or self.mac:
4991 ip = self.instance.nics[0].ip
4993 bridge = self.bridge
4995 bridge = self.instance.nics[0].bridge
4999 mac = self.instance.nics[0].mac
5000 args['nics'] = [(ip, bridge, mac)]
5001 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
5002 nl = [self.sstore.GetMasterNode(),
5003 self.instance.primary_node] + list(self.instance.secondary_nodes)
5006 def CheckPrereq(self):
5007 """Check prerequisites.
5009 This only checks the instance list against the existing names.
5012 self.mem = getattr(self.op, "mem", None)
5013 self.vcpus = getattr(self.op, "vcpus", None)
5014 self.ip = getattr(self.op, "ip", None)
5015 self.mac = getattr(self.op, "mac", None)
5016 self.bridge = getattr(self.op, "bridge", None)
5017 self.kernel_path = getattr(self.op, "kernel_path", None)
5018 self.initrd_path = getattr(self.op, "initrd_path", None)
5019 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
5020 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
5021 self.hvm_pae = getattr(self.op, "hvm_pae", None)
5022 self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
5023 self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
5024 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
5025 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
5026 self.force = getattr(self.op, "force", None)
5027 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
5028 self.kernel_path, self.initrd_path, self.hvm_boot_order,
5029 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
5030 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
5031 if all_parms.count(None) == len(all_parms):
5032 raise errors.OpPrereqError("No changes submitted")
5033 if self.mem is not None:
5035 self.mem = int(self.mem)
5036 except ValueError, err:
5037 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
5038 if self.vcpus is not None:
5040 self.vcpus = int(self.vcpus)
5041 except ValueError, err:
5042 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
5043 if self.ip is not None:
5045 if self.ip.lower() == "none":
5048 if not utils.IsValidIP(self.ip):
5049 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
5052 self.do_bridge = (self.bridge is not None)
5053 if self.mac is not None:
5054 if self.cfg.IsMacInUse(self.mac):
5055 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
5057 if not utils.IsValidMac(self.mac):
5058 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
5060 if self.kernel_path is not None:
5061 self.do_kernel_path = True
5062 if self.kernel_path == constants.VALUE_NONE:
5063 raise errors.OpPrereqError("Can't set instance to no kernel")
5065 if self.kernel_path != constants.VALUE_DEFAULT:
5066 if not os.path.isabs(self.kernel_path):
5067 raise errors.OpPrereqError("The kernel path must be an absolute"
5070 self.do_kernel_path = False
5072 if self.initrd_path is not None:
5073 self.do_initrd_path = True
5074 if self.initrd_path not in (constants.VALUE_NONE,
5075 constants.VALUE_DEFAULT):
5076 if not os.path.isabs(self.initrd_path):
5077 raise errors.OpPrereqError("The initrd path must be an absolute"
5080 self.do_initrd_path = False
5082 # boot order verification
5083 if self.hvm_boot_order is not None:
5084 if self.hvm_boot_order != constants.VALUE_DEFAULT:
5085 if len(self.hvm_boot_order.strip("acdn")) != 0:
5086 raise errors.OpPrereqError("invalid boot order specified,"
5087 " must be one or more of [acdn]"
5090 # hvm_cdrom_image_path verification
5091 if self.op.hvm_cdrom_image_path is not None:
5092 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
5093 self.op.hvm_cdrom_image_path.lower() == "none"):
5094 raise errors.OpPrereqError("The path to the HVM CDROM image must"
5095 " be an absolute path or None, not %s" %
5096 self.op.hvm_cdrom_image_path)
5097 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
5098 self.op.hvm_cdrom_image_path.lower() == "none"):
5099 raise errors.OpPrereqError("The HVM CDROM image must either be a"
5100 " regular file or a symlink pointing to"
5101 " an existing regular file, not %s" %
5102 self.op.hvm_cdrom_image_path)
5104 # vnc_bind_address verification
5105 if self.op.vnc_bind_address is not None:
5106 if not utils.IsValidIP(self.op.vnc_bind_address):
5107 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
5108 " like a valid IP address" %
5109 self.op.vnc_bind_address)
5111 # Xen HVM device type checks
5112 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
5113 if self.op.hvm_nic_type is not None:
5114 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
5115 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
5116 " HVM hypervisor" % self.op.hvm_nic_type)
5117 if self.op.hvm_disk_type is not None:
5118 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
5119 raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
5120 " HVM hypervisor" % self.op.hvm_disk_type)
5122 instance = self.cfg.GetInstanceInfo(
5123 self.cfg.ExpandInstanceName(self.op.instance_name))
5124 if instance is None:
5125 raise errors.OpPrereqError("No such instance name '%s'" %
5126 self.op.instance_name)
5127 self.op.instance_name = instance.name
5128 self.instance = instance
5130 if self.mem is not None and not self.force:
5131 pnode = self.instance.primary_node
5133 nodelist.extend(instance.secondary_nodes)
5134 instance_info = rpc.call_instance_info(pnode, instance.name)
5135 nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
5137 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
5138 # Assume the primary node is unreachable and go ahead
5139 self.warn.append("Can't get info from primary node %s" % pnode)
5142 current_mem = instance_info['memory']
5144 # Assume instance not running
5145 # (there is a slight race condition here, but it's not very probable,
5146 # and we have no other way to check)
5148 miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
5150 raise errors.OpPrereqError("This change will prevent the instance"
5151 " from starting, due to %d MB of memory"
5152 " missing on its primary node" % miss_mem)
5154 for node in instance.secondary_nodes:
5155 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
5156 self.warn.append("Can't get info from secondary node %s" % node)
5157 elif self.mem > nodeinfo[node]['memory_free']:
5158 self.warn.append("Not enough memory to failover instance to secondary"
5162 def Exec(self, feedback_fn):
5163 """Modifies an instance.
5165 All parameters take effect only at the next restart of the instance.
5167 # Process here the warnings from CheckPrereq, as we don't have a
5168 # feedback_fn there.
5169 for warn in self.warn:
5170 feedback_fn("WARNING: %s" % warn)
5173 instance = self.instance
5175 instance.memory = self.mem
5176 result.append(("mem", self.mem))
5178 instance.vcpus = self.vcpus
5179 result.append(("vcpus", self.vcpus))
5181 instance.nics[0].ip = self.ip
5182 result.append(("ip", self.ip))
5184 instance.nics[0].bridge = self.bridge
5185 result.append(("bridge", self.bridge))
5187 instance.nics[0].mac = self.mac
5188 result.append(("mac", self.mac))
5189 if self.do_kernel_path:
5190 instance.kernel_path = self.kernel_path
5191 result.append(("kernel_path", self.kernel_path))
5192 if self.do_initrd_path:
5193 instance.initrd_path = self.initrd_path
5194 result.append(("initrd_path", self.initrd_path))
5195 if self.hvm_boot_order:
5196 if self.hvm_boot_order == constants.VALUE_DEFAULT:
5197 instance.hvm_boot_order = None
5199 instance.hvm_boot_order = self.hvm_boot_order
5200 result.append(("hvm_boot_order", self.hvm_boot_order))
5201 if self.hvm_acpi is not None:
5202 instance.hvm_acpi = self.hvm_acpi
5203 result.append(("hvm_acpi", self.hvm_acpi))
5204 if self.hvm_pae is not None:
5205 instance.hvm_pae = self.hvm_pae
5206 result.append(("hvm_pae", self.hvm_pae))
5207 if self.hvm_nic_type is not None:
5208 instance.hvm_nic_type = self.hvm_nic_type
5209 result.append(("hvm_nic_type", self.hvm_nic_type))
5210 if self.hvm_disk_type is not None:
5211 instance.hvm_disk_type = self.hvm_disk_type
5212 result.append(("hvm_disk_type", self.hvm_disk_type))
5213 if self.hvm_cdrom_image_path:
5214 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
5215 instance.hvm_cdrom_image_path = None
5217 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
5218 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
5219 if self.vnc_bind_address:
5220 instance.vnc_bind_address = self.vnc_bind_address
5221 result.append(("vnc_bind_address", self.vnc_bind_address))
5223 self.cfg.AddInstance(instance)
5228 class LUQueryExports(NoHooksLU):
5229 """Query the exports list
5234 def CheckPrereq(self):
5235 """Check that the nodelist contains only existing nodes.
5238 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5240 def Exec(self, feedback_fn):
5241 """Compute the list of all the exported system images.
5244 a dictionary with the structure node->(export-list)
5245 where export-list is a list of the instances exported on
5249 return rpc.call_export_list(self.nodes)
5252 class LUExportInstance(LogicalUnit):
5253 """Export an instance to an image in the cluster.
5256 HPATH = "instance-export"
5257 HTYPE = constants.HTYPE_INSTANCE
5258 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5260 def BuildHooksEnv(self):
5263 This will run on the master, primary node and target node.
5267 "EXPORT_NODE": self.op.target_node,
5268 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5270 env.update(_BuildInstanceHookEnvByObject(self.instance))
5271 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5272 self.op.target_node]
5275 def CheckPrereq(self):
5276 """Check prerequisites.
5278 This checks that the instance and node names are valid.
5281 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5282 self.instance = self.cfg.GetInstanceInfo(instance_name)
5283 if self.instance is None:
5284 raise errors.OpPrereqError("Instance '%s' not found" %
5285 self.op.instance_name)
5288 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5289 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5291 if self.dst_node is None:
5292 raise errors.OpPrereqError("Destination node '%s' is unknown." %
5293 self.op.target_node)
5294 self.op.target_node = self.dst_node.name
5296 def Exec(self, feedback_fn):
5297 """Export an instance to an image in the cluster.
5300 instance = self.instance
5301 dst_node = self.dst_node
5302 src_node = instance.primary_node
5303 if self.op.shutdown:
5304 # shutdown the instance, but not the disks
5305 if not rpc.call_instance_shutdown(src_node, instance):
5306 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5307 (instance.name, src_node))
5309 vgname = self.cfg.GetVGName()
5314 for disk in instance.disks:
5315 if disk.iv_name == "sda":
5316 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5317 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5319 if not new_dev_name:
5320 logger.Error("could not snapshot block device %s on node %s" %
5321 (disk.logical_id[1], src_node))
5323 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5324 logical_id=(vgname, new_dev_name),
5325 physical_id=(vgname, new_dev_name),
5326 iv_name=disk.iv_name)
5327 snap_disks.append(new_dev)
5330 if self.op.shutdown and instance.status == "up":
5331 if not rpc.call_instance_start(src_node, instance, None):
5332 _ShutdownInstanceDisks(instance, self.cfg)
5333 raise errors.OpExecError("Could not start instance")
5335 # TODO: check for size
5337 for dev in snap_disks:
5338 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5340 logger.Error("could not export block device %s from node"
5342 (dev.logical_id[1], src_node, dst_node.name))
5343 if not rpc.call_blockdev_remove(src_node, dev):
5344 logger.Error("could not remove snapshot block device %s from"
5345 " node %s" % (dev.logical_id[1], src_node))
5347 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5348 logger.Error("could not finalize export for instance %s on node %s" %
5349 (instance.name, dst_node.name))
5351 nodelist = self.cfg.GetNodeList()
5352 nodelist.remove(dst_node.name)
5354 # on one-node clusters nodelist will be empty after the removal
5355 # if we proceed the backup would be removed because OpQueryExports
5356 # substitutes an empty list with the full cluster node list.
5358 op = opcodes.OpQueryExports(nodes=nodelist)
5359 exportlist = self.proc.ChainOpCode(op)
5360 for node in exportlist:
5361 if instance.name in exportlist[node]:
5362 if not rpc.call_export_remove(node, instance.name):
5363 logger.Error("could not remove older export for instance %s"
5364 " on node %s" % (instance.name, node))
5367 class LURemoveExport(NoHooksLU):
5368 """Remove exports related to the named instance.
5371 _OP_REQP = ["instance_name"]
5373 def CheckPrereq(self):
5374 """Check prerequisites.
5378 def Exec(self, feedback_fn):
5379 """Remove any export.
5382 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5383 # If the instance was not found we'll try with the name that was passed in.
5384 # This will only work if it was an FQDN, though.
5386 if not instance_name:
5388 instance_name = self.op.instance_name
5390 op = opcodes.OpQueryExports(nodes=[])
5391 exportlist = self.proc.ChainOpCode(op)
5393 for node in exportlist:
5394 if instance_name in exportlist[node]:
5396 if not rpc.call_export_remove(node, instance_name):
5397 logger.Error("could not remove export for instance %s"
5398 " on node %s" % (instance_name, node))
5400 if fqdn_warn and not found:
5401 feedback_fn("Export not found. If trying to remove an export belonging"
5402 " to a deleted instance please use its Fully Qualified"
5406 class TagsLU(NoHooksLU):
5409 This is an abstract class which is the parent of all the other tags LUs.
5412 def CheckPrereq(self):
5413 """Check prerequisites.
5416 if self.op.kind == constants.TAG_CLUSTER:
5417 self.target = self.cfg.GetClusterInfo()
5418 elif self.op.kind == constants.TAG_NODE:
5419 name = self.cfg.ExpandNodeName(self.op.name)
5421 raise errors.OpPrereqError("Invalid node name (%s)" %
5424 self.target = self.cfg.GetNodeInfo(name)
5425 elif self.op.kind == constants.TAG_INSTANCE:
5426 name = self.cfg.ExpandInstanceName(self.op.name)
5428 raise errors.OpPrereqError("Invalid instance name (%s)" %
5431 self.target = self.cfg.GetInstanceInfo(name)
5433 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5437 class LUGetTags(TagsLU):
5438 """Returns the tags of a given object.
5441 _OP_REQP = ["kind", "name"]
5443 def Exec(self, feedback_fn):
5444 """Returns the tag list.
5447 return self.target.GetTags()
5450 class LUSearchTags(NoHooksLU):
5451 """Searches the tags for a given pattern.
5454 _OP_REQP = ["pattern"]
5456 def CheckPrereq(self):
5457 """Check prerequisites.
5459 This checks the pattern passed for validity by compiling it.
5463 self.re = re.compile(self.op.pattern)
5464 except re.error, err:
5465 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5466 (self.op.pattern, err))
5468 def Exec(self, feedback_fn):
5469 """Returns the tag list.
5473 tgts = [("/cluster", cfg.GetClusterInfo())]
5474 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5475 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5476 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5477 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5479 for path, target in tgts:
5480 for tag in target.GetTags():
5481 if self.re.search(tag):
5482 results.append((path, tag))
5486 class LUAddTags(TagsLU):
5487 """Sets a tag on a given object.
5490 _OP_REQP = ["kind", "name", "tags"]
5492 def CheckPrereq(self):
5493 """Check prerequisites.
5495 This checks the type and length of the tag name and value.
5498 TagsLU.CheckPrereq(self)
5499 for tag in self.op.tags:
5500 objects.TaggableObject.ValidateTag(tag)
5502 def Exec(self, feedback_fn):
5507 for tag in self.op.tags:
5508 self.target.AddTag(tag)
5509 except errors.TagError, err:
5510 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5512 self.cfg.Update(self.target)
5513 except errors.ConfigurationError:
5514 raise errors.OpRetryError("There has been a modification to the"
5515 " config file and the operation has been"
5516 " aborted. Please retry.")
5519 class LUDelTags(TagsLU):
5520 """Delete a list of tags from a given object.
5523 _OP_REQP = ["kind", "name", "tags"]
5525 def CheckPrereq(self):
5526 """Check prerequisites.
5528 This checks that we have the given tag.
5531 TagsLU.CheckPrereq(self)
5532 for tag in self.op.tags:
5533 objects.TaggableObject.ValidateTag(tag, removal=True)
5534 del_tags = frozenset(self.op.tags)
5535 cur_tags = self.target.GetTags()
5536 if not del_tags <= cur_tags:
5537 diff_tags = del_tags - cur_tags
5538 diff_names = ["'%s'" % tag for tag in diff_tags]
5540 raise errors.OpPrereqError("Tag(s) %s not found" %
5541 (",".join(diff_names)))
5543 def Exec(self, feedback_fn):
5544 """Remove the tag from the object.
5547 for tag in self.op.tags:
5548 self.target.RemoveTag(tag)
5550 self.cfg.Update(self.target)
5551 except errors.ConfigurationError:
5552 raise errors.OpRetryError("There has been a modification to the"
5553 " config file and the operation has been"
5554 " aborted. Please retry.")
5556 class LUTestDelay(NoHooksLU):
5557 """Sleep for a specified amount of time.
5559 This LU sleeps on the master and/or nodes for a specified amoutn of
5563 _OP_REQP = ["duration", "on_master", "on_nodes"]
5565 def CheckPrereq(self):
5566 """Check prerequisites.
5568 This checks that we have a good list of nodes and/or the duration
5573 if self.op.on_nodes:
5574 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5576 def Exec(self, feedback_fn):
5577 """Do the actual sleep.
5580 if self.op.on_master:
5581 if not utils.TestDelay(self.op.duration):
5582 raise errors.OpExecError("Error during master delay test")
5583 if self.op.on_nodes:
5584 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5586 raise errors.OpExecError("Complete failure from rpc call")
5587 for node, node_result in result.items():
5589 raise errors.OpExecError("Failure during rpc call to node %s,"
5590 " result: %s" % (node, node_result))
5593 class IAllocator(object):
5594 """IAllocator framework.
5596 An IAllocator instance has three sets of attributes:
5597 - cfg/sstore that are needed to query the cluster
5598 - input data (all members of the _KEYS class attribute are required)
5599 - four buffer attributes (in|out_data|text), that represent the
5600 input (to the external script) in text and data structure format,
5601 and the output from it, again in two formats
5602 - the result variables from the script (success, info, nodes) for
5607 "mem_size", "disks", "disk_template",
5608 "os", "tags", "nics", "vcpus",
5614 def __init__(self, cfg, sstore, mode, name, **kwargs):
5616 self.sstore = sstore
5617 # init buffer variables
5618 self.in_text = self.out_text = self.in_data = self.out_data = None
5619 # init all input fields so that pylint is happy
5622 self.mem_size = self.disks = self.disk_template = None
5623 self.os = self.tags = self.nics = self.vcpus = None
5624 self.relocate_from = None
5626 self.required_nodes = None
5627 # init result fields
5628 self.success = self.info = self.nodes = None
5629 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5630 keyset = self._ALLO_KEYS
5631 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5632 keyset = self._RELO_KEYS
5634 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5635 " IAllocator" % self.mode)
5637 if key not in keyset:
5638 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5639 " IAllocator" % key)
5640 setattr(self, key, kwargs[key])
5642 if key not in kwargs:
5643 raise errors.ProgrammerError("Missing input parameter '%s' to"
5644 " IAllocator" % key)
5645 self._BuildInputData()
5647 def _ComputeClusterData(self):
5648 """Compute the generic allocator input data.
5650 This is the data that is independent of the actual operation.
5657 "cluster_name": self.sstore.GetClusterName(),
5658 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5659 "hypervisor_type": self.sstore.GetHypervisorType(),
5660 # we don't have job IDs
5663 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5667 node_list = cfg.GetNodeList()
5668 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5669 for nname in node_list:
5670 ninfo = cfg.GetNodeInfo(nname)
5671 if nname not in node_data or not isinstance(node_data[nname], dict):
5672 raise errors.OpExecError("Can't get data for node %s" % nname)
5673 remote_info = node_data[nname]
5674 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5675 'vg_size', 'vg_free', 'cpu_total']:
5676 if attr not in remote_info:
5677 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5680 remote_info[attr] = int(remote_info[attr])
5681 except ValueError, err:
5682 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5683 " %s" % (nname, attr, str(err)))
5684 # compute memory used by primary instances
5685 i_p_mem = i_p_up_mem = 0
5686 for iinfo in i_list:
5687 if iinfo.primary_node == nname:
5688 i_p_mem += iinfo.memory
5689 if iinfo.status == "up":
5690 i_p_up_mem += iinfo.memory
5692 # compute memory used by instances
5694 "tags": list(ninfo.GetTags()),
5695 "total_memory": remote_info['memory_total'],
5696 "reserved_memory": remote_info['memory_dom0'],
5697 "free_memory": remote_info['memory_free'],
5698 "i_pri_memory": i_p_mem,
5699 "i_pri_up_memory": i_p_up_mem,
5700 "total_disk": remote_info['vg_size'],
5701 "free_disk": remote_info['vg_free'],
5702 "primary_ip": ninfo.primary_ip,
5703 "secondary_ip": ninfo.secondary_ip,
5704 "total_cpus": remote_info['cpu_total'],
5706 node_results[nname] = pnr
5707 data["nodes"] = node_results
5711 for iinfo in i_list:
5712 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5713 for n in iinfo.nics]
5715 "tags": list(iinfo.GetTags()),
5716 "should_run": iinfo.status == "up",
5717 "vcpus": iinfo.vcpus,
5718 "memory": iinfo.memory,
5720 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5722 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5723 "disk_template": iinfo.disk_template,
5725 instance_data[iinfo.name] = pir
5727 data["instances"] = instance_data
5731 def _AddNewInstance(self):
5732 """Add new instance data to allocator structure.
5734 This in combination with _AllocatorGetClusterData will create the
5735 correct structure needed as input for the allocator.
5737 The checks for the completeness of the opcode must have already been
5742 if len(self.disks) != 2:
5743 raise errors.OpExecError("Only two-disk configurations supported")
5745 disk_space = _ComputeDiskSize(self.disk_template,
5746 self.disks[0]["size"], self.disks[1]["size"])
5748 if self.disk_template in constants.DTS_NET_MIRROR:
5749 self.required_nodes = 2
5751 self.required_nodes = 1
5755 "disk_template": self.disk_template,
5758 "vcpus": self.vcpus,
5759 "memory": self.mem_size,
5760 "disks": self.disks,
5761 "disk_space_total": disk_space,
5763 "required_nodes": self.required_nodes,
5765 data["request"] = request
5767 def _AddRelocateInstance(self):
5768 """Add relocate instance data to allocator structure.
5770 This in combination with _IAllocatorGetClusterData will create the
5771 correct structure needed as input for the allocator.
5773 The checks for the completeness of the opcode must have already been
5777 instance = self.cfg.GetInstanceInfo(self.name)
5778 if instance is None:
5779 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5780 " IAllocator" % self.name)
5782 if instance.disk_template not in constants.DTS_NET_MIRROR:
5783 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5785 if len(instance.secondary_nodes) != 1:
5786 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5788 self.required_nodes = 1
5790 disk_space = _ComputeDiskSize(instance.disk_template,
5791 instance.disks[0].size,
5792 instance.disks[1].size)
5797 "disk_space_total": disk_space,
5798 "required_nodes": self.required_nodes,
5799 "relocate_from": self.relocate_from,
5801 self.in_data["request"] = request
5803 def _BuildInputData(self):
5804 """Build input data structures.
5807 self._ComputeClusterData()
5809 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5810 self._AddNewInstance()
5812 self._AddRelocateInstance()
5814 self.in_text = serializer.Dump(self.in_data)
5816 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5817 """Run an instance allocator and return the results.
5822 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5824 if not isinstance(result, tuple) or len(result) != 4:
5825 raise errors.OpExecError("Invalid result from master iallocator runner")
5827 rcode, stdout, stderr, fail = result
5829 if rcode == constants.IARUN_NOTFOUND:
5830 raise errors.OpExecError("Can't find allocator '%s'" % name)
5831 elif rcode == constants.IARUN_FAILURE:
5832 raise errors.OpExecError("Instance allocator call failed: %s,"
5834 (fail, stdout+stderr))
5835 self.out_text = stdout
5837 self._ValidateResult()
5839 def _ValidateResult(self):
5840 """Process the allocator results.
5842 This will process and if successful save the result in
5843 self.out_data and the other parameters.
5847 rdict = serializer.Load(self.out_text)
5848 except Exception, err:
5849 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5851 if not isinstance(rdict, dict):
5852 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5854 for key in "success", "info", "nodes":
5855 if key not in rdict:
5856 raise errors.OpExecError("Can't parse iallocator results:"
5857 " missing key '%s'" % key)
5858 setattr(self, key, rdict[key])
5860 if not isinstance(rdict["nodes"], list):
5861 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5863 self.out_data = rdict
5866 class LUTestAllocator(NoHooksLU):
5867 """Run allocator tests.
5869 This LU runs the allocator tests
5872 _OP_REQP = ["direction", "mode", "name"]
5874 def CheckPrereq(self):
5875 """Check prerequisites.
5877 This checks the opcode parameters depending on the director and mode test.
5880 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5881 for attr in ["name", "mem_size", "disks", "disk_template",
5882 "os", "tags", "nics", "vcpus"]:
5883 if not hasattr(self.op, attr):
5884 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5886 iname = self.cfg.ExpandInstanceName(self.op.name)
5887 if iname is not None:
5888 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5890 if not isinstance(self.op.nics, list):
5891 raise errors.OpPrereqError("Invalid parameter 'nics'")
5892 for row in self.op.nics:
5893 if (not isinstance(row, dict) or
5896 "bridge" not in row):
5897 raise errors.OpPrereqError("Invalid contents of the"
5898 " 'nics' parameter")
5899 if not isinstance(self.op.disks, list):
5900 raise errors.OpPrereqError("Invalid parameter 'disks'")
5901 if len(self.op.disks) != 2:
5902 raise errors.OpPrereqError("Only two-disk configurations supported")
5903 for row in self.op.disks:
5904 if (not isinstance(row, dict) or
5905 "size" not in row or
5906 not isinstance(row["size"], int) or
5907 "mode" not in row or
5908 row["mode"] not in ['r', 'w']):
5909 raise errors.OpPrereqError("Invalid contents of the"
5910 " 'disks' parameter")
5911 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5912 if not hasattr(self.op, "name"):
5913 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5914 fname = self.cfg.ExpandInstanceName(self.op.name)
5916 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5918 self.op.name = fname
5919 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5921 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5924 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5925 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5926 raise errors.OpPrereqError("Missing allocator name")
5927 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5928 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5931 def Exec(self, feedback_fn):
5932 """Run the allocator test.
5935 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5936 ial = IAllocator(self.cfg, self.sstore,
5939 mem_size=self.op.mem_size,
5940 disks=self.op.disks,
5941 disk_template=self.op.disk_template,
5945 vcpus=self.op.vcpus,
5948 ial = IAllocator(self.cfg, self.sstore,
5951 relocate_from=list(self.relocate_from),
5954 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5955 result = ial.in_text
5957 ial.Run(self.op.allocator, validate=False)
5958 result = ial.out_text