4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 No nodes should be returned as an empty list (and not None).
138 Note that if the HPATH for a LU class is None, this function will
142 raise NotImplementedError
144 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145 """Notify the LU about the results of its hooks.
147 This method is called every time a hooks phase is executed, and notifies
148 the Logical Unit about the hooks' result. The LU can then use it to alter
149 its result based on the hooks. By default the method does nothing and the
150 previous result is passed back unchanged but any LU can define it if it
151 wants to use the local cluster hook-scripts somehow.
154 phase: the hooks phase that has just been run
155 hooks_results: the results of the multi-node hooks rpc call
156 feedback_fn: function to send feedback back to the caller
157 lu_result: the previous result this LU had, or None in the PRE phase.
163 class NoHooksLU(LogicalUnit):
164 """Simple LU which runs no hooks.
166 This LU is intended as a parent for other LogicalUnits which will
167 run no hooks, in order to reduce duplicate code.
174 def _AddHostToEtcHosts(hostname):
175 """Wrapper around utils.SetEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182 def _RemoveHostFromEtcHosts(hostname):
183 """Wrapper around utils.RemoveEtcHostsEntry.
186 hi = utils.HostInfo(name=hostname)
187 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191 def _GetWantedNodes(lu, nodes):
192 """Returns list of checked and expanded node names.
195 nodes: List of nodes (strings) or None for all
198 if not isinstance(nodes, list):
199 raise errors.OpPrereqError("Invalid argument type 'nodes'")
205 node = lu.cfg.ExpandNodeName(name)
207 raise errors.OpPrereqError("No such node name '%s'" % name)
211 wanted = lu.cfg.GetNodeList()
212 return utils.NiceSort(wanted)
215 def _GetWantedInstances(lu, instances):
216 """Returns list of checked and expanded instance names.
219 instances: List of instances (strings) or None for all
222 if not isinstance(instances, list):
223 raise errors.OpPrereqError("Invalid argument type 'instances'")
228 for name in instances:
229 instance = lu.cfg.ExpandInstanceName(name)
231 raise errors.OpPrereqError("No such instance name '%s'" % name)
232 wanted.append(instance)
235 wanted = lu.cfg.GetInstanceList()
236 return utils.NiceSort(wanted)
239 def _CheckOutputFields(static, dynamic, selected):
240 """Checks whether all selected fields are valid.
243 static: Static fields
244 dynamic: Dynamic fields
247 static_fields = frozenset(static)
248 dynamic_fields = frozenset(dynamic)
250 all_fields = static_fields | dynamic_fields
252 if not all_fields.issuperset(selected):
253 raise errors.OpPrereqError("Unknown output fields selected: %s"
254 % ",".join(frozenset(selected).
255 difference(all_fields)))
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259 memory, vcpus, nics):
260 """Builds instance related env variables for hooks from single variables.
263 secondary_nodes: List of secondary nodes as strings
267 "INSTANCE_NAME": name,
268 "INSTANCE_PRIMARY": primary_node,
269 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270 "INSTANCE_OS_TYPE": os_type,
271 "INSTANCE_STATUS": status,
272 "INSTANCE_MEMORY": memory,
273 "INSTANCE_VCPUS": vcpus,
277 nic_count = len(nics)
278 for idx, (ip, bridge, mac) in enumerate(nics):
281 env["INSTANCE_NIC%d_IP" % idx] = ip
282 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
287 env["INSTANCE_NIC_COUNT"] = nic_count
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293 """Builds instance related env variables for hooks from an object.
296 instance: objects.Instance object of instance
297 override: dict of values to override
300 'name': instance.name,
301 'primary_node': instance.primary_node,
302 'secondary_nodes': instance.secondary_nodes,
303 'os_type': instance.os,
304 'status': instance.os,
305 'memory': instance.memory,
306 'vcpus': instance.vcpus,
307 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310 args.update(override)
311 return _BuildInstanceHookEnv(**args)
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315 """Ensure a node has a correct known_hosts entry.
318 fullnode - Fully qualified domain name of host. (str)
319 ip - IPv4 address of host (str)
320 pubkey - the public key of the cluster
323 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
335 logger.Debug('read %s' % (repr(rawline),))
337 parts = rawline.rstrip('\r\n').split()
339 # Ignore unwanted lines
340 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341 fields = parts[0].split(',')
346 for spec in [ ip, fullnode ]:
347 if spec not in fields:
352 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353 if haveall and key == pubkey:
355 save_lines.append(rawline)
356 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359 if havesome and (not haveall or key != pubkey):
361 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364 save_lines.append(rawline)
367 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371 save_lines = save_lines + add_lines
373 # Write a new file and replace old.
374 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
376 newfile = os.fdopen(fd, 'w')
378 newfile.write(''.join(save_lines))
381 logger.Debug("Wrote new known_hosts.")
382 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385 # Simply appending a new line will do the trick.
387 for add in add_lines:
393 def _HasValidVG(vglist, vgname):
394 """Checks if the volume group list is valid.
396 A non-None return value means there's an error, and the return value
397 is the error message.
400 vgsize = vglist.get(vgname, None)
402 return "volume group '%s' missing" % vgname
404 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
409 def _InitSSHSetup(node):
410 """Setup the SSH configuration for the cluster.
413 This generates a dsa keypair for root, adds the pub key to the
414 permitted hosts and adds the hostkey to its own known hosts.
417 node: the name of this host as a fqdn
420 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
422 for name in priv_key, pub_key:
423 if os.path.exists(name):
424 utils.CreateBackup(name)
425 utils.RemoveFile(name)
427 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
431 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434 f = open(pub_key, 'r')
436 utils.AddAuthorizedKey(auth_keys, f.read(8192))
441 def _InitGanetiServerSetup(ss):
442 """Setup the necessary configuration for the initial node daemon.
444 This creates the nodepass file containing the shared password for
445 the cluster and also generates the SSL certificate.
448 # Create pseudo random password
449 randpass = sha.new(os.urandom(64)).hexdigest()
450 # and write it into sstore
451 ss.SetKey(ss.SS_NODED_PASS, randpass)
453 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454 "-days", str(365*5), "-nodes", "-x509",
455 "-keyout", constants.SSL_CERT_FILE,
456 "-out", constants.SSL_CERT_FILE, "-batch"])
458 raise errors.OpExecError("could not generate server ssl cert, command"
459 " %s had exitcode %s and error message %s" %
460 (result.cmd, result.exit_code, result.output))
462 os.chmod(constants.SSL_CERT_FILE, 0400)
464 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467 raise errors.OpExecError("Could not start the node daemon, command %s"
468 " had exitcode %s and error %s" %
469 (result.cmd, result.exit_code, result.output))
472 def _CheckInstanceBridgesExist(instance):
473 """Check that the brigdes needed by an instance exist.
476 # check bridges existance
477 brlist = [nic.bridge for nic in instance.nics]
478 if not rpc.call_bridges_exist(instance.primary_node, brlist):
479 raise errors.OpPrereqError("one or more target bridges %s does not"
480 " exist on destination node '%s'" %
481 (brlist, instance.primary_node))
484 class LUInitCluster(LogicalUnit):
485 """Initialise the cluster.
488 HPATH = "cluster-init"
489 HTYPE = constants.HTYPE_CLUSTER
490 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491 "def_bridge", "master_netdev"]
494 def BuildHooksEnv(self):
497 Notes: Since we don't require a cluster, we must manually add
498 ourselves in the post-run node list.
501 env = {"OP_TARGET": self.op.cluster_name}
502 return env, [], [self.hostname.name]
504 def CheckPrereq(self):
505 """Verify that the passed name is a valid one.
508 if config.ConfigWriter.IsCluster():
509 raise errors.OpPrereqError("Cluster is already initialised")
511 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512 if not os.path.exists(constants.VNC_PASSWORD_FILE):
513 raise errors.OpPrereqError("Please prepare the cluster VNC"
515 constants.VNC_PASSWORD_FILE)
517 self.hostname = hostname = utils.HostInfo()
519 if hostname.ip.startswith("127."):
520 raise errors.OpPrereqError("This host's IP resolves to the private"
521 " range (%s). Please fix DNS or %s." %
522 (hostname.ip, constants.ETC_HOSTS))
524 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525 source=constants.LOCALHOST_IP_ADDRESS):
526 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527 " to %s,\nbut this ip address does not"
528 " belong to this host."
529 " Aborting." % hostname.ip)
531 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
533 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
535 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
537 secondary_ip = getattr(self.op, "secondary_ip", None)
538 if secondary_ip and not utils.IsValidIP(secondary_ip):
539 raise errors.OpPrereqError("Invalid secondary ip given")
541 secondary_ip != hostname.ip and
542 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543 source=constants.LOCALHOST_IP_ADDRESS))):
544 raise errors.OpPrereqError("You gave %s as secondary IP,"
545 " but it does not belong to this host." %
547 self.secondary_ip = secondary_ip
549 # checks presence of the volume group given
550 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553 raise errors.OpPrereqError("Error: %s" % vgstatus)
555 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
557 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560 if self.op.hypervisor_type not in constants.HYPER_TYPES:
561 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562 self.op.hypervisor_type)
564 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
566 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567 (self.op.master_netdev,
568 result.output.strip()))
570 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572 raise errors.OpPrereqError("Init.d script '%s' missing or not"
573 " executable." % constants.NODE_INITD_SCRIPT)
575 def Exec(self, feedback_fn):
576 """Initialize the cluster.
579 clustername = self.clustername
580 hostname = self.hostname
582 # set up the simple store
583 self.sstore = ss = ssconf.SimpleStore()
584 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
590 # set up the inter-node password and certificate
591 _InitGanetiServerSetup(ss)
593 # start the master ip
594 rpc.call_node_start_master(hostname.name)
596 # set up ssh config and /etc/hosts
597 f = open(constants.SSH_HOST_RSA_PUB, 'r')
602 sshkey = sshline.split(" ")[1]
604 _AddHostToEtcHosts(hostname.name)
606 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
608 _InitSSHSetup(hostname.name)
610 # init of cluster config file
611 self.cfg = cfgw = config.ConfigWriter()
612 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613 sshkey, self.op.mac_prefix,
614 self.op.vg_name, self.op.def_bridge)
617 class LUDestroyCluster(NoHooksLU):
618 """Logical unit for destroying the cluster.
623 def CheckPrereq(self):
624 """Check prerequisites.
626 This checks whether the cluster is empty.
628 Any errors are signalled by raising errors.OpPrereqError.
631 master = self.sstore.GetMasterNode()
633 nodelist = self.cfg.GetNodeList()
634 if len(nodelist) != 1 or nodelist[0] != master:
635 raise errors.OpPrereqError("There are still %d node(s) in"
636 " this cluster." % (len(nodelist) - 1))
637 instancelist = self.cfg.GetInstanceList()
639 raise errors.OpPrereqError("There are still %d instance(s) in"
640 " this cluster." % len(instancelist))
642 def Exec(self, feedback_fn):
643 """Destroys the cluster.
646 master = self.sstore.GetMasterNode()
647 if not rpc.call_node_stop_master(master):
648 raise errors.OpExecError("Could not disable the master role")
649 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650 utils.CreateBackup(priv_key)
651 utils.CreateBackup(pub_key)
652 rpc.call_node_leave_cluster(master)
655 class LUVerifyCluster(LogicalUnit):
656 """Verifies the cluster status.
659 HPATH = "cluster-verify"
660 HTYPE = constants.HTYPE_CLUSTER
661 _OP_REQP = ["skip_checks"]
663 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664 remote_version, feedback_fn):
665 """Run multiple tests against a node.
668 - compares ganeti version
669 - checks vg existance and size > 20G
670 - checks config file checksum
671 - checks ssh to other nodes
674 node: name of the node to check
675 file_list: required list of files
676 local_cksum: dictionary of local files and their checksums
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 if not remote_version:
682 feedback_fn(" - ERROR: connection to %s failed" % (node))
685 if local_version != remote_version:
686 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
687 (local_version, node, remote_version))
690 # checks vg existance and size > 20G
694 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
698 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
700 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
703 # checks config file checksum
706 if 'filelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned file checksum data")
710 remote_cksum = node_result['filelist']
711 for file_name in file_list:
712 if file_name not in remote_cksum:
714 feedback_fn(" - ERROR: file '%s' missing" % file_name)
715 elif remote_cksum[file_name] != local_cksum[file_name]:
717 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
719 if 'nodelist' not in node_result:
721 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
723 if node_result['nodelist']:
725 for node in node_result['nodelist']:
726 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
727 (node, node_result['nodelist'][node]))
728 if 'node-net-test' not in node_result:
730 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
732 if node_result['node-net-test']:
734 nlist = utils.NiceSort(node_result['node-net-test'].keys())
736 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
737 (node, node_result['node-net-test'][node]))
739 hyp_result = node_result.get('hypervisor', None)
740 if hyp_result is not None:
741 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
744 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745 node_instance, feedback_fn):
746 """Verify an instance.
748 This function checks to see if the required block devices are
749 available on the instance's node.
754 node_current = instanceconfig.primary_node
757 instanceconfig.MapLVsByNode(node_vol_should)
759 for node in node_vol_should:
760 for volume in node_vol_should[node]:
761 if node not in node_vol_is or volume not in node_vol_is[node]:
762 feedback_fn(" - ERROR: volume %s missing on node %s" %
766 if not instanceconfig.status == 'down':
767 if (node_current not in node_instance or
768 not instance in node_instance[node_current]):
769 feedback_fn(" - ERROR: instance %s not running on node %s" %
770 (instance, node_current))
773 for node in node_instance:
774 if (not node == node_current):
775 if instance in node_instance[node]:
776 feedback_fn(" - ERROR: instance %s should not run on node %s" %
782 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783 """Verify if there are any unknown volumes in the cluster.
785 The .os, .swap and backup volumes are ignored. All other volumes are
791 for node in node_vol_is:
792 for volume in node_vol_is[node]:
793 if node not in node_vol_should or volume not in node_vol_should[node]:
794 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
799 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800 """Verify the list of running instances.
802 This checks what instances are running but unknown to the cluster.
806 for node in node_instance:
807 for runninginstance in node_instance[node]:
808 if runninginstance not in instancelist:
809 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
810 (runninginstance, node))
814 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815 """Verify N+1 Memory Resilience.
817 Check that if one single node dies we can still start all the instances it
823 for node, nodeinfo in node_info.iteritems():
824 # This code checks that every node which is now listed as secondary has
825 # enough memory to host all instances it is supposed to should a single
826 # other node in the cluster fail.
827 # FIXME: not ready for failover to an arbitrary node
828 # FIXME: does not support file-backed instances
829 # WARNING: we currently take into account down instances as well as up
830 # ones, considering that even if they're down someone might want to start
831 # them even in the event of a node failure.
832 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
834 for instance in instances:
835 if instance_cfg[instance].auto_balance:
836 needed_mem += instance_cfg[instance].memory
837 if nodeinfo['mfree'] < needed_mem:
838 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
839 " failovers should node %s fail" % (node, prinode))
843 def CheckPrereq(self):
844 """Check prerequisites.
846 Transform the list of checks we're going to skip into a set and check that
847 all its members are valid.
850 self.skip_set = frozenset(self.op.skip_checks)
851 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
852 raise errors.OpPrereqError("Invalid checks to be skipped specified")
854 def BuildHooksEnv(self):
857 Cluster-Verify hooks just rone in the post phase and their failure makes
858 the output be logged in the verify output and the verification to fail.
861 all_nodes = self.cfg.GetNodeList()
862 tags = self.cfg.GetClusterInfo().GetTags()
863 # TODO: populate the environment with useful information for verify hooks
865 "CLUSTER_TAGS": " ".join(tags),
867 return env, [], all_nodes
869 def Exec(self, feedback_fn):
870 """Verify integrity of cluster, performing various test on nodes.
874 feedback_fn("* Verifying global settings")
875 for msg in self.cfg.VerifyConfig():
876 feedback_fn(" - ERROR: %s" % msg)
878 vg_name = self.cfg.GetVGName()
879 nodelist = utils.NiceSort(self.cfg.GetNodeList())
880 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
881 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
882 i_non_redundant = [] # Non redundant instances
883 i_non_a_balanced = [] # Non auto-balanced instances
889 # FIXME: verify OS list
891 file_names = list(self.sstore.GetFileList())
892 file_names.append(constants.SSL_CERT_FILE)
893 file_names.append(constants.CLUSTER_CONF_FILE)
894 local_checksums = utils.FingerprintFiles(file_names)
896 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
897 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
898 all_instanceinfo = rpc.call_instance_list(nodelist)
899 all_vglist = rpc.call_vg_list(nodelist)
900 node_verify_param = {
901 'filelist': file_names,
902 'nodelist': nodelist,
904 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
905 for node in nodeinfo]
907 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
908 all_rversion = rpc.call_version(nodelist)
909 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
911 incomplete_nodeinfo = False
913 for node in nodelist:
914 feedback_fn("* Verifying node %s" % node)
915 result = self._VerifyNode(node, file_names, local_checksums,
916 all_vglist[node], all_nvinfo[node],
917 all_rversion[node], feedback_fn)
921 volumeinfo = all_volumeinfo[node]
923 if isinstance(volumeinfo, basestring):
924 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
925 (node, volumeinfo[-400:].encode('string_escape')))
927 node_volume[node] = {}
928 elif not isinstance(volumeinfo, dict):
929 feedback_fn(" - ERROR: connection to %s failed" % (node,))
931 incomplete_nodeinfo = True
934 node_volume[node] = volumeinfo
937 nodeinstance = all_instanceinfo[node]
938 if type(nodeinstance) != list:
939 feedback_fn(" - ERROR: connection to %s failed" % (node,))
941 incomplete_nodeinfo = True
944 node_instance[node] = nodeinstance
947 nodeinfo = all_ninfo[node]
948 if not isinstance(nodeinfo, dict):
949 feedback_fn(" - ERROR: connection to %s failed" % (node,))
951 incomplete_nodeinfo = True
956 "mfree": int(nodeinfo['memory_free']),
957 "dfree": int(nodeinfo['vg_free']),
960 # dictionary holding all instances this node is secondary for,
961 # grouped by their primary node. Each key is a cluster node, and each
962 # value is a list of instances which have the key as primary and the
963 # current node as secondary. this is handy to calculate N+1 memory
964 # availability if you can only failover from a primary to its
966 "sinst-by-pnode": {},
968 except (ValueError, TypeError):
969 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
971 incomplete_nodeinfo = True
976 for instance in instancelist:
977 feedback_fn("* Verifying instance %s" % instance)
978 inst_config = self.cfg.GetInstanceInfo(instance)
979 result = self._VerifyInstance(instance, inst_config, node_volume,
980 node_instance, feedback_fn)
983 inst_config.MapLVsByNode(node_vol_should)
985 instance_cfg[instance] = inst_config
987 pnode = inst_config.primary_node
988 if pnode in node_info:
989 node_info[pnode]['pinst'].append(instance)
991 feedback_fn(" - ERROR: instance %s, connection to primary node"
992 " %s failed" % (instance, pnode))
995 # If the instance is non-redundant we cannot survive losing its primary
996 # node, so we are not N+1 compliant. On the other hand we have no disk
997 # templates with more than one secondary so that situation is not well
999 # FIXME: does not support file-backed instances
1000 if len(inst_config.secondary_nodes) == 0:
1001 i_non_redundant.append(instance)
1002 elif len(inst_config.secondary_nodes) > 1:
1003 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1006 if not inst_config.auto_balance:
1007 i_non_a_balanced.append(instance)
1009 for snode in inst_config.secondary_nodes:
1010 if snode in node_info:
1011 node_info[snode]['sinst'].append(instance)
1012 if pnode not in node_info[snode]['sinst-by-pnode']:
1013 node_info[snode]['sinst-by-pnode'][pnode] = []
1014 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1016 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1017 " %s failed" % (instance, snode))
1019 feedback_fn("* Verifying orphan volumes")
1020 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1024 feedback_fn("* Verifying remaining instances")
1025 result = self._VerifyOrphanInstances(instancelist, node_instance,
1029 if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1030 not incomplete_nodeinfo):
1031 feedback_fn("* Verifying N+1 Memory redundancy")
1032 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1035 feedback_fn("* Other Notes")
1037 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1038 % len(i_non_redundant))
1040 if i_non_a_balanced:
1041 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1042 % len(i_non_a_balanced))
1046 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1047 """Analize the post-hooks' result, handle it, and send some
1048 nicely-formatted feedback back to the user.
1051 phase: the hooks phase that has just been run
1052 hooks_results: the results of the multi-node hooks rpc call
1053 feedback_fn: function to send feedback back to the caller
1054 lu_result: previous Exec result
1057 # We only really run POST phase hooks, and are only interested in their results
1058 if phase == constants.HOOKS_PHASE_POST:
1059 # Used to change hooks' output to proper indentation
1060 indent_re = re.compile('^', re.M)
1061 feedback_fn("* Hooks Results")
1062 if not hooks_results:
1063 feedback_fn(" - ERROR: general communication failure")
1066 for node_name in hooks_results:
1067 show_node_header = True
1068 res = hooks_results[node_name]
1069 if res is False or not isinstance(res, list):
1070 feedback_fn(" Communication failure")
1073 for script, hkr, output in res:
1074 if hkr == constants.HKR_FAIL:
1075 # The node header is only shown once, if there are
1076 # failing hooks on that node
1077 if show_node_header:
1078 feedback_fn(" Node %s:" % node_name)
1079 show_node_header = False
1080 feedback_fn(" ERROR: Script %s failed, output:" % script)
1081 output = indent_re.sub(' ', output)
1082 feedback_fn("%s" % output)
1088 class LUVerifyDisks(NoHooksLU):
1089 """Verifies the cluster disks status.
1094 def CheckPrereq(self):
1095 """Check prerequisites.
1097 This has no prerequisites.
1102 def Exec(self, feedback_fn):
1103 """Verify integrity of cluster disks.
1106 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1108 vg_name = self.cfg.GetVGName()
1109 nodes = utils.NiceSort(self.cfg.GetNodeList())
1110 instances = [self.cfg.GetInstanceInfo(name)
1111 for name in self.cfg.GetInstanceList()]
1114 for inst in instances:
1116 if (inst.status != "up" or
1117 inst.disk_template not in constants.DTS_NET_MIRROR):
1119 inst.MapLVsByNode(inst_lvs)
1120 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1121 for node, vol_list in inst_lvs.iteritems():
1122 for vol in vol_list:
1123 nv_dict[(node, vol)] = inst
1128 node_lvs = rpc.call_volume_list(nodes, vg_name)
1133 lvs = node_lvs[node]
1135 if isinstance(lvs, basestring):
1136 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1137 res_nlvm[node] = lvs
1138 elif not isinstance(lvs, dict):
1139 logger.Info("connection to node %s failed or invalid data returned" %
1141 res_nodes.append(node)
1144 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1145 inst = nv_dict.pop((node, lv_name), None)
1146 if (not lv_online and inst is not None
1147 and inst.name not in res_instances):
1148 res_instances.append(inst.name)
1150 # any leftover items in nv_dict are missing LVs, let's arrange the
1152 for key, inst in nv_dict.iteritems():
1153 if inst.name not in res_missing:
1154 res_missing[inst.name] = []
1155 res_missing[inst.name].append(key)
1160 class LURenameCluster(LogicalUnit):
1161 """Rename the cluster.
1164 HPATH = "cluster-rename"
1165 HTYPE = constants.HTYPE_CLUSTER
1168 def BuildHooksEnv(self):
1173 "OP_TARGET": self.sstore.GetClusterName(),
1174 "NEW_NAME": self.op.name,
1176 mn = self.sstore.GetMasterNode()
1177 return env, [mn], [mn]
1179 def CheckPrereq(self):
1180 """Verify that the passed name is a valid one.
1183 hostname = utils.HostInfo(self.op.name)
1185 new_name = hostname.name
1186 self.ip = new_ip = hostname.ip
1187 old_name = self.sstore.GetClusterName()
1188 old_ip = self.sstore.GetMasterIP()
1189 if new_name == old_name and new_ip == old_ip:
1190 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1191 " cluster has changed")
1192 if new_ip != old_ip:
1193 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1194 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1195 " reachable on the network. Aborting." %
1198 self.op.name = new_name
1200 def Exec(self, feedback_fn):
1201 """Rename the cluster.
1204 clustername = self.op.name
1208 # shutdown the master IP
1209 master = ss.GetMasterNode()
1210 if not rpc.call_node_stop_master(master):
1211 raise errors.OpExecError("Could not disable the master role")
1215 ss.SetKey(ss.SS_MASTER_IP, ip)
1216 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1218 # Distribute updated ss config to all nodes
1219 myself = self.cfg.GetNodeInfo(master)
1220 dist_nodes = self.cfg.GetNodeList()
1221 if myself.name in dist_nodes:
1222 dist_nodes.remove(myself.name)
1224 logger.Debug("Copying updated ssconf data to all nodes")
1225 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1226 fname = ss.KeyToFilename(keyname)
1227 result = rpc.call_upload_file(dist_nodes, fname)
1228 for to_node in dist_nodes:
1229 if not result[to_node]:
1230 logger.Error("copy of file %s to node %s failed" %
1233 if not rpc.call_node_start_master(master):
1234 logger.Error("Could not re-enable the master role on the master,"
1235 " please restart manually.")
1238 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1239 """Sleep and poll for an instance's disk to sync.
1242 if not instance.disks:
1246 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1248 node = instance.primary_node
1250 for dev in instance.disks:
1251 cfgw.SetDiskID(dev, node)
1257 cumul_degraded = False
1258 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1260 proc.LogWarning("Can't get any data from node %s" % node)
1263 raise errors.RemoteError("Can't contact node %s for mirror data,"
1264 " aborting." % node)
1268 for i in range(len(rstats)):
1271 proc.LogWarning("Can't compute data for node %s/%s" %
1272 (node, instance.disks[i].iv_name))
1274 # we ignore the ldisk parameter
1275 perc_done, est_time, is_degraded, _ = mstat
1276 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1277 if perc_done is not None:
1279 if est_time is not None:
1280 rem_time = "%d estimated seconds remaining" % est_time
1283 rem_time = "no time estimate"
1284 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1285 (instance.disks[i].iv_name, perc_done, rem_time))
1292 time.sleep(min(60, max_time))
1298 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1299 return not cumul_degraded
1302 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1303 """Check that mirrors are not degraded.
1305 The ldisk parameter, if True, will change the test from the
1306 is_degraded attribute (which represents overall non-ok status for
1307 the device(s)) to the ldisk (representing the local storage status).
1310 cfgw.SetDiskID(dev, node)
1317 if on_primary or dev.AssembleOnSecondary():
1318 rstats = rpc.call_blockdev_find(node, dev)
1320 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1323 result = result and (not rstats[idx])
1325 for child in dev.children:
1326 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1331 class LUDiagnoseOS(NoHooksLU):
1332 """Logical unit for OS diagnose/query.
1335 _OP_REQP = ["output_fields", "names"]
1337 def CheckPrereq(self):
1338 """Check prerequisites.
1340 This always succeeds, since this is a pure query LU.
1344 raise errors.OpPrereqError("Selective OS query not supported")
1346 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1347 _CheckOutputFields(static=[],
1348 dynamic=self.dynamic_fields,
1349 selected=self.op.output_fields)
1352 def _DiagnoseByOS(node_list, rlist):
1353 """Remaps a per-node return list into an a per-os per-node dictionary
1356 node_list: a list with the names of all nodes
1357 rlist: a map with node names as keys and OS objects as values
1360 map: a map with osnames as keys and as value another map, with
1362 keys and list of OS objects as values
1363 e.g. {"debian-etch": {"node1": [<object>,...],
1364 "node2": [<object>,]}
1369 for node_name, nr in rlist.iteritems():
1373 if os.name not in all_os:
1374 # build a list of nodes for this os containing empty lists
1375 # for each node in node_list
1376 all_os[os.name] = {}
1377 for nname in node_list:
1378 all_os[os.name][nname] = []
1379 all_os[os.name][node_name].append(os)
1382 def Exec(self, feedback_fn):
1383 """Compute the list of OSes.
1386 node_list = self.cfg.GetNodeList()
1387 node_data = rpc.call_os_diagnose(node_list)
1388 if node_data == False:
1389 raise errors.OpExecError("Can't gather the list of OSes")
1390 pol = self._DiagnoseByOS(node_list, node_data)
1392 for os_name, os_data in pol.iteritems():
1394 for field in self.op.output_fields:
1397 elif field == "valid":
1398 val = utils.all([osl and osl[0] for osl in os_data.values()])
1399 elif field == "node_status":
1401 for node_name, nos_list in os_data.iteritems():
1402 val[node_name] = [(v.status, v.path) for v in nos_list]
1404 raise errors.ParameterError(field)
1411 class LURemoveNode(LogicalUnit):
1412 """Logical unit for removing a node.
1415 HPATH = "node-remove"
1416 HTYPE = constants.HTYPE_NODE
1417 _OP_REQP = ["node_name"]
1419 def BuildHooksEnv(self):
1422 This doesn't run on the target node in the pre phase as a failed
1423 node would not allows itself to run.
1427 "OP_TARGET": self.op.node_name,
1428 "NODE_NAME": self.op.node_name,
1430 all_nodes = self.cfg.GetNodeList()
1431 all_nodes.remove(self.op.node_name)
1432 return env, all_nodes, all_nodes
1434 def CheckPrereq(self):
1435 """Check prerequisites.
1438 - the node exists in the configuration
1439 - it does not have primary or secondary instances
1440 - it's not the master
1442 Any errors are signalled by raising errors.OpPrereqError.
1445 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1447 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1449 instance_list = self.cfg.GetInstanceList()
1451 masternode = self.sstore.GetMasterNode()
1452 if node.name == masternode:
1453 raise errors.OpPrereqError("Node is the master node,"
1454 " you need to failover first.")
1456 for instance_name in instance_list:
1457 instance = self.cfg.GetInstanceInfo(instance_name)
1458 if node.name == instance.primary_node:
1459 raise errors.OpPrereqError("Instance %s still running on the node,"
1460 " please remove first." % instance_name)
1461 if node.name in instance.secondary_nodes:
1462 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1463 " please remove first." % instance_name)
1464 self.op.node_name = node.name
1467 def Exec(self, feedback_fn):
1468 """Removes the node from the cluster.
1472 logger.Info("stopping the node daemon and removing configs from node %s" %
1475 rpc.call_node_leave_cluster(node.name)
1477 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1479 logger.Info("Removing node %s from config" % node.name)
1481 self.cfg.RemoveNode(node.name)
1483 _RemoveHostFromEtcHosts(node.name)
1486 class LUQueryNodes(NoHooksLU):
1487 """Logical unit for querying nodes.
1490 _OP_REQP = ["output_fields", "names"]
1492 def CheckPrereq(self):
1493 """Check prerequisites.
1495 This checks that the fields required are valid output fields.
1498 self.dynamic_fields = frozenset([
1500 "mtotal", "mnode", "mfree",
1505 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1506 "pinst_list", "sinst_list",
1507 "pip", "sip", "tags"],
1508 dynamic=self.dynamic_fields,
1509 selected=self.op.output_fields)
1511 self.wanted = _GetWantedNodes(self, self.op.names)
1513 def Exec(self, feedback_fn):
1514 """Computes the list of nodes and their attributes.
1517 nodenames = self.wanted
1518 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1520 # begin data gathering
1522 if self.dynamic_fields.intersection(self.op.output_fields):
1524 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1525 for name in nodenames:
1526 nodeinfo = node_data.get(name, None)
1529 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1530 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1531 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1532 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1533 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1534 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1535 "bootid": nodeinfo['bootid'],
1538 live_data[name] = {}
1540 live_data = dict.fromkeys(nodenames, {})
1542 node_to_primary = dict([(name, set()) for name in nodenames])
1543 node_to_secondary = dict([(name, set()) for name in nodenames])
1545 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1546 "sinst_cnt", "sinst_list"))
1547 if inst_fields & frozenset(self.op.output_fields):
1548 instancelist = self.cfg.GetInstanceList()
1550 for instance_name in instancelist:
1551 inst = self.cfg.GetInstanceInfo(instance_name)
1552 if inst.primary_node in node_to_primary:
1553 node_to_primary[inst.primary_node].add(inst.name)
1554 for secnode in inst.secondary_nodes:
1555 if secnode in node_to_secondary:
1556 node_to_secondary[secnode].add(inst.name)
1558 # end data gathering
1561 for node in nodelist:
1563 for field in self.op.output_fields:
1566 elif field == "pinst_list":
1567 val = list(node_to_primary[node.name])
1568 elif field == "sinst_list":
1569 val = list(node_to_secondary[node.name])
1570 elif field == "pinst_cnt":
1571 val = len(node_to_primary[node.name])
1572 elif field == "sinst_cnt":
1573 val = len(node_to_secondary[node.name])
1574 elif field == "pip":
1575 val = node.primary_ip
1576 elif field == "sip":
1577 val = node.secondary_ip
1578 elif field == "tags":
1579 val = list(node.GetTags())
1580 elif field in self.dynamic_fields:
1581 val = live_data[node.name].get(field, None)
1583 raise errors.ParameterError(field)
1584 node_output.append(val)
1585 output.append(node_output)
1590 class LUQueryNodeVolumes(NoHooksLU):
1591 """Logical unit for getting volumes on node(s).
1594 _OP_REQP = ["nodes", "output_fields"]
1596 def CheckPrereq(self):
1597 """Check prerequisites.
1599 This checks that the fields required are valid output fields.
1602 self.nodes = _GetWantedNodes(self, self.op.nodes)
1604 _CheckOutputFields(static=["node"],
1605 dynamic=["phys", "vg", "name", "size", "instance"],
1606 selected=self.op.output_fields)
1609 def Exec(self, feedback_fn):
1610 """Computes the list of nodes and their attributes.
1613 nodenames = self.nodes
1614 volumes = rpc.call_node_volumes(nodenames)
1616 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1617 in self.cfg.GetInstanceList()]
1619 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1622 for node in nodenames:
1623 if node not in volumes or not volumes[node]:
1626 node_vols = volumes[node][:]
1627 node_vols.sort(key=lambda vol: vol['dev'])
1629 for vol in node_vols:
1631 for field in self.op.output_fields:
1634 elif field == "phys":
1638 elif field == "name":
1640 elif field == "size":
1641 val = int(float(vol['size']))
1642 elif field == "instance":
1644 if node not in lv_by_node[inst]:
1646 if vol['name'] in lv_by_node[inst][node]:
1652 raise errors.ParameterError(field)
1653 node_output.append(str(val))
1655 output.append(node_output)
1660 class LUAddNode(LogicalUnit):
1661 """Logical unit for adding node to the cluster.
1665 HTYPE = constants.HTYPE_NODE
1666 _OP_REQP = ["node_name"]
1668 def BuildHooksEnv(self):
1671 This will run on all nodes before, and on all nodes + the new node after.
1675 "OP_TARGET": self.op.node_name,
1676 "NODE_NAME": self.op.node_name,
1677 "NODE_PIP": self.op.primary_ip,
1678 "NODE_SIP": self.op.secondary_ip,
1680 nodes_0 = self.cfg.GetNodeList()
1681 nodes_1 = nodes_0 + [self.op.node_name, ]
1682 return env, nodes_0, nodes_1
1684 def CheckPrereq(self):
1685 """Check prerequisites.
1688 - the new node is not already in the config
1690 - its parameters (single/dual homed) matches the cluster
1692 Any errors are signalled by raising errors.OpPrereqError.
1695 node_name = self.op.node_name
1698 dns_data = utils.HostInfo(node_name)
1700 node = dns_data.name
1701 primary_ip = self.op.primary_ip = dns_data.ip
1702 secondary_ip = getattr(self.op, "secondary_ip", None)
1703 if secondary_ip is None:
1704 secondary_ip = primary_ip
1705 if not utils.IsValidIP(secondary_ip):
1706 raise errors.OpPrereqError("Invalid secondary IP given")
1707 self.op.secondary_ip = secondary_ip
1709 node_list = cfg.GetNodeList()
1710 if not self.op.readd and node in node_list:
1711 raise errors.OpPrereqError("Node %s is already in the configuration" %
1713 elif self.op.readd and node not in node_list:
1714 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1716 for existing_node_name in node_list:
1717 existing_node = cfg.GetNodeInfo(existing_node_name)
1719 if self.op.readd and node == existing_node_name:
1720 if (existing_node.primary_ip != primary_ip or
1721 existing_node.secondary_ip != secondary_ip):
1722 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1723 " address configuration as before")
1726 if (existing_node.primary_ip == primary_ip or
1727 existing_node.secondary_ip == primary_ip or
1728 existing_node.primary_ip == secondary_ip or
1729 existing_node.secondary_ip == secondary_ip):
1730 raise errors.OpPrereqError("New node ip address(es) conflict with"
1731 " existing node %s" % existing_node.name)
1733 # check that the type of the node (single versus dual homed) is the
1734 # same as for the master
1735 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1736 master_singlehomed = myself.secondary_ip == myself.primary_ip
1737 newbie_singlehomed = secondary_ip == primary_ip
1738 if master_singlehomed != newbie_singlehomed:
1739 if master_singlehomed:
1740 raise errors.OpPrereqError("The master has no private ip but the"
1741 " new node has one")
1743 raise errors.OpPrereqError("The master has a private ip but the"
1744 " new node doesn't have one")
1746 # checks reachablity
1747 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1748 raise errors.OpPrereqError("Node not reachable by ping")
1750 if not newbie_singlehomed:
1751 # check reachability from my secondary ip to newbie's secondary ip
1752 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1753 source=myself.secondary_ip):
1754 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1755 " based ping to noded port")
1757 self.new_node = objects.Node(name=node,
1758 primary_ip=primary_ip,
1759 secondary_ip=secondary_ip)
1761 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1762 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1763 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1764 constants.VNC_PASSWORD_FILE)
1766 def Exec(self, feedback_fn):
1767 """Adds the new node to the cluster.
1770 new_node = self.new_node
1771 node = new_node.name
1773 # set up inter-node password and certificate and restarts the node daemon
1774 gntpass = self.sstore.GetNodeDaemonPassword()
1775 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1776 raise errors.OpExecError("ganeti password corruption detected")
1777 f = open(constants.SSL_CERT_FILE)
1779 gntpem = f.read(8192)
1782 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1783 # so we use this to detect an invalid certificate; as long as the
1784 # cert doesn't contain this, the here-document will be correctly
1785 # parsed by the shell sequence below
1786 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1787 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1788 if not gntpem.endswith("\n"):
1789 raise errors.OpExecError("PEM must end with newline")
1790 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1792 # and then connect with ssh to set password and start ganeti-noded
1793 # note that all the below variables are sanitized at this point,
1794 # either by being constants or by the checks above
1796 mycommand = ("umask 077 && "
1797 "echo '%s' > '%s' && "
1798 "cat > '%s' << '!EOF.' && \n"
1799 "%s!EOF.\n%s restart" %
1800 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1801 constants.SSL_CERT_FILE, gntpem,
1802 constants.NODE_INITD_SCRIPT))
1804 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1806 raise errors.OpExecError("Remote command on node %s, error: %s,"
1808 (node, result.fail_reason, result.output))
1810 # check connectivity
1813 result = rpc.call_version([node])[node]
1815 if constants.PROTOCOL_VERSION == result:
1816 logger.Info("communication to node %s fine, sw version %s match" %
1819 raise errors.OpExecError("Version mismatch master version %s,"
1820 " node version %s" %
1821 (constants.PROTOCOL_VERSION, result))
1823 raise errors.OpExecError("Cannot get version from the new node")
1826 logger.Info("copy ssh key to node %s" % node)
1827 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1829 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1830 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1836 keyarray.append(f.read())
1840 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1841 keyarray[3], keyarray[4], keyarray[5])
1844 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1846 # Add node to our /etc/hosts, and add key to known_hosts
1847 _AddHostToEtcHosts(new_node.name)
1849 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1850 self.cfg.GetHostKey())
1852 if new_node.secondary_ip != new_node.primary_ip:
1853 if not rpc.call_node_tcp_ping(new_node.name,
1854 constants.LOCALHOST_IP_ADDRESS,
1855 new_node.secondary_ip,
1856 constants.DEFAULT_NODED_PORT,
1858 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1859 " you gave (%s). Please fix and re-run this"
1860 " command." % new_node.secondary_ip)
1862 success, msg = ssh.VerifyNodeHostname(node)
1864 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1865 " than the one the resolver gives: %s."
1866 " Please fix and re-run this command." %
1869 # Distribute updated /etc/hosts and known_hosts to all nodes,
1870 # including the node just added
1871 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1872 dist_nodes = self.cfg.GetNodeList()
1873 if not self.op.readd:
1874 dist_nodes.append(node)
1875 if myself.name in dist_nodes:
1876 dist_nodes.remove(myself.name)
1878 logger.Debug("Copying hosts and known_hosts to all nodes")
1879 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1880 result = rpc.call_upload_file(dist_nodes, fname)
1881 for to_node in dist_nodes:
1882 if not result[to_node]:
1883 logger.Error("copy of file %s to node %s failed" %
1886 to_copy = ss.GetFileList()
1887 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1888 to_copy.append(constants.VNC_PASSWORD_FILE)
1889 for fname in to_copy:
1890 if not ssh.CopyFileToNode(node, fname):
1891 logger.Error("could not copy file %s to node %s" % (fname, node))
1893 if not self.op.readd:
1894 logger.Info("adding node %s to cluster.conf" % node)
1895 self.cfg.AddNode(new_node)
1898 class LUMasterFailover(LogicalUnit):
1899 """Failover the master node to the current node.
1901 This is a special LU in that it must run on a non-master node.
1904 HPATH = "master-failover"
1905 HTYPE = constants.HTYPE_CLUSTER
1909 def BuildHooksEnv(self):
1912 This will run on the new master only in the pre phase, and on all
1913 the nodes in the post phase.
1917 "OP_TARGET": self.new_master,
1918 "NEW_MASTER": self.new_master,
1919 "OLD_MASTER": self.old_master,
1921 return env, [self.new_master], self.cfg.GetNodeList()
1923 def CheckPrereq(self):
1924 """Check prerequisites.
1926 This checks that we are not already the master.
1929 self.new_master = utils.HostInfo().name
1930 self.old_master = self.sstore.GetMasterNode()
1932 if self.old_master == self.new_master:
1933 raise errors.OpPrereqError("This commands must be run on the node"
1934 " where you want the new master to be."
1935 " %s is already the master" %
1938 def Exec(self, feedback_fn):
1939 """Failover the master node.
1941 This command, when run on a non-master node, will cause the current
1942 master to cease being master, and the non-master to become new
1946 #TODO: do not rely on gethostname returning the FQDN
1947 logger.Info("setting master to %s, old master: %s" %
1948 (self.new_master, self.old_master))
1950 if not rpc.call_node_stop_master(self.old_master):
1951 logger.Error("could disable the master role on the old master"
1952 " %s, please disable manually" % self.old_master)
1955 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1956 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1957 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1958 logger.Error("could not distribute the new simple store master file"
1959 " to the other nodes, please check.")
1961 if not rpc.call_node_start_master(self.new_master):
1962 logger.Error("could not start the master role on the new master"
1963 " %s, please check" % self.new_master)
1964 feedback_fn("Error in activating the master IP on the new master,"
1965 " please fix manually.")
1969 class LUQueryClusterInfo(NoHooksLU):
1970 """Query cluster configuration.
1976 def CheckPrereq(self):
1977 """No prerequsites needed for this LU.
1982 def Exec(self, feedback_fn):
1983 """Return cluster config.
1987 "name": self.sstore.GetClusterName(),
1988 "software_version": constants.RELEASE_VERSION,
1989 "protocol_version": constants.PROTOCOL_VERSION,
1990 "config_version": constants.CONFIG_VERSION,
1991 "os_api_version": constants.OS_API_VERSION,
1992 "export_version": constants.EXPORT_VERSION,
1993 "master": self.sstore.GetMasterNode(),
1994 "architecture": (platform.architecture()[0], platform.machine()),
1995 "hypervisor_type": self.sstore.GetHypervisorType(),
2001 class LUClusterCopyFile(NoHooksLU):
2002 """Copy file to cluster.
2005 _OP_REQP = ["nodes", "filename"]
2007 def CheckPrereq(self):
2008 """Check prerequisites.
2010 It should check that the named file exists and that the given list
2014 if not os.path.exists(self.op.filename):
2015 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2017 self.nodes = _GetWantedNodes(self, self.op.nodes)
2019 def Exec(self, feedback_fn):
2020 """Copy a file from master to some nodes.
2023 opts - class with options as members
2024 args - list containing a single element, the file name
2026 nodes - list containing the name of target nodes; if empty, all nodes
2029 filename = self.op.filename
2031 myname = utils.HostInfo().name
2033 for node in self.nodes:
2036 if not ssh.CopyFileToNode(node, filename):
2037 logger.Error("Copy of file %s to node %s failed" % (filename, node))
2040 class LUDumpClusterConfig(NoHooksLU):
2041 """Return a text-representation of the cluster-config.
2046 def CheckPrereq(self):
2047 """No prerequisites.
2052 def Exec(self, feedback_fn):
2053 """Dump a representation of the cluster config to the standard output.
2056 return self.cfg.DumpConfig()
2059 class LURunClusterCommand(NoHooksLU):
2060 """Run a command on some nodes.
2063 _OP_REQP = ["command", "nodes"]
2065 def CheckPrereq(self):
2066 """Check prerequisites.
2068 It checks that the given list of nodes is valid.
2071 self.nodes = _GetWantedNodes(self, self.op.nodes)
2073 def Exec(self, feedback_fn):
2074 """Run a command on some nodes.
2077 # put the master at the end of the nodes list
2078 master_node = self.sstore.GetMasterNode()
2079 if master_node in self.nodes:
2080 self.nodes.remove(master_node)
2081 self.nodes.append(master_node)
2084 for node in self.nodes:
2085 result = ssh.SSHCall(node, "root", self.op.command)
2086 data.append((node, result.output, result.exit_code))
2091 class LUActivateInstanceDisks(NoHooksLU):
2092 """Bring up an instance's disks.
2095 _OP_REQP = ["instance_name"]
2097 def CheckPrereq(self):
2098 """Check prerequisites.
2100 This checks that the instance is in the cluster.
2103 instance = self.cfg.GetInstanceInfo(
2104 self.cfg.ExpandInstanceName(self.op.instance_name))
2105 if instance is None:
2106 raise errors.OpPrereqError("Instance '%s' not known" %
2107 self.op.instance_name)
2108 self.instance = instance
2111 def Exec(self, feedback_fn):
2112 """Activate the disks.
2115 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2117 raise errors.OpExecError("Cannot activate block devices")
2122 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2123 """Prepare the block devices for an instance.
2125 This sets up the block devices on all nodes.
2128 instance: a ganeti.objects.Instance object
2129 ignore_secondaries: if true, errors on secondary nodes won't result
2130 in an error return from the function
2133 false if the operation failed
2134 list of (host, instance_visible_name, node_visible_name) if the operation
2135 suceeded with the mapping from node devices to instance devices
2139 iname = instance.name
2140 # With the two passes mechanism we try to reduce the window of
2141 # opportunity for the race condition of switching DRBD to primary
2142 # before handshaking occured, but we do not eliminate it
2144 # The proper fix would be to wait (with some limits) until the
2145 # connection has been made and drbd transitions from WFConnection
2146 # into any other network-connected state (Connected, SyncTarget,
2149 # 1st pass, assemble on all nodes in secondary mode
2150 for inst_disk in instance.disks:
2151 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2152 cfg.SetDiskID(node_disk, node)
2153 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2155 logger.Error("could not prepare block device %s on node %s"
2156 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2157 if not ignore_secondaries:
2160 # FIXME: race condition on drbd migration to primary
2162 # 2nd pass, do only the primary node
2163 for inst_disk in instance.disks:
2164 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2165 if node != instance.primary_node:
2167 cfg.SetDiskID(node_disk, node)
2168 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2170 logger.Error("could not prepare block device %s on node %s"
2171 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2173 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2175 # leave the disks configured for the primary node
2176 # this is a workaround that would be fixed better by
2177 # improving the logical/physical id handling
2178 for disk in instance.disks:
2179 cfg.SetDiskID(disk, instance.primary_node)
2181 return disks_ok, device_info
2184 def _StartInstanceDisks(cfg, instance, force):
2185 """Start the disks of an instance.
2188 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2189 ignore_secondaries=force)
2191 _ShutdownInstanceDisks(instance, cfg)
2192 if force is not None and not force:
2193 logger.Error("If the message above refers to a secondary node,"
2194 " you can retry the operation using '--force'.")
2195 raise errors.OpExecError("Disk consistency error")
2198 class LUDeactivateInstanceDisks(NoHooksLU):
2199 """Shutdown an instance's disks.
2202 _OP_REQP = ["instance_name"]
2204 def CheckPrereq(self):
2205 """Check prerequisites.
2207 This checks that the instance is in the cluster.
2210 instance = self.cfg.GetInstanceInfo(
2211 self.cfg.ExpandInstanceName(self.op.instance_name))
2212 if instance is None:
2213 raise errors.OpPrereqError("Instance '%s' not known" %
2214 self.op.instance_name)
2215 self.instance = instance
2217 def Exec(self, feedback_fn):
2218 """Deactivate the disks
2221 instance = self.instance
2222 ins_l = rpc.call_instance_list([instance.primary_node])
2223 ins_l = ins_l[instance.primary_node]
2224 if not type(ins_l) is list:
2225 raise errors.OpExecError("Can't contact node '%s'" %
2226 instance.primary_node)
2228 if self.instance.name in ins_l:
2229 raise errors.OpExecError("Instance is running, can't shutdown"
2232 _ShutdownInstanceDisks(instance, self.cfg)
2235 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2236 """Shutdown block devices of an instance.
2238 This does the shutdown on all nodes of the instance.
2240 If the ignore_primary is false, errors on the primary node are
2245 for disk in instance.disks:
2246 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2247 cfg.SetDiskID(top_disk, node)
2248 if not rpc.call_blockdev_shutdown(node, top_disk):
2249 logger.Error("could not shutdown block device %s on node %s" %
2250 (disk.iv_name, node))
2251 if not ignore_primary or node != instance.primary_node:
2256 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2257 """Checks if a node has enough free memory.
2259 This function check if a given node has the needed amount of free
2260 memory. In case the node has less memory or we cannot get the
2261 information from the node, this function raise an OpPrereqError
2265 - cfg: a ConfigWriter instance
2266 - node: the node name
2267 - reason: string to use in the error message
2268 - requested: the amount of memory in MiB
2271 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2272 if not (nodeinfo and isinstance(nodeinfo, dict) and
2273 node in nodeinfo and isinstance(nodeinfo[node], dict)):
2274 raise errors.OpPrereqError("Could not contact node %s for resource"
2275 " information" % (node,))
2277 free_mem = nodeinfo[node].get('memory_free')
2278 if not isinstance(free_mem, int):
2279 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2280 " was '%s'" % (node, free_mem))
2281 if requested > free_mem:
2282 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2283 " needed %s MiB, available %s MiB" %
2284 (node, reason, requested, free_mem))
2287 class LUStartupInstance(LogicalUnit):
2288 """Starts an instance.
2291 HPATH = "instance-start"
2292 HTYPE = constants.HTYPE_INSTANCE
2293 _OP_REQP = ["instance_name", "force"]
2295 def BuildHooksEnv(self):
2298 This runs on master, primary and secondary nodes of the instance.
2302 "FORCE": self.op.force,
2304 env.update(_BuildInstanceHookEnvByObject(self.instance))
2305 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2306 list(self.instance.secondary_nodes))
2309 def CheckPrereq(self):
2310 """Check prerequisites.
2312 This checks that the instance is in the cluster.
2315 instance = self.cfg.GetInstanceInfo(
2316 self.cfg.ExpandInstanceName(self.op.instance_name))
2317 if instance is None:
2318 raise errors.OpPrereqError("Instance '%s' not known" %
2319 self.op.instance_name)
2321 # check bridges existance
2322 _CheckInstanceBridgesExist(instance)
2324 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2325 "starting instance %s" % instance.name,
2328 self.instance = instance
2329 self.op.instance_name = instance.name
2331 def Exec(self, feedback_fn):
2332 """Start the instance.
2335 instance = self.instance
2336 force = self.op.force
2337 extra_args = getattr(self.op, "extra_args", "")
2339 self.cfg.MarkInstanceUp(instance.name)
2341 node_current = instance.primary_node
2343 _StartInstanceDisks(self.cfg, instance, force)
2345 if not rpc.call_instance_start(node_current, instance, extra_args):
2346 _ShutdownInstanceDisks(instance, self.cfg)
2347 raise errors.OpExecError("Could not start instance")
2350 class LURebootInstance(LogicalUnit):
2351 """Reboot an instance.
2354 HPATH = "instance-reboot"
2355 HTYPE = constants.HTYPE_INSTANCE
2356 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2358 def BuildHooksEnv(self):
2361 This runs on master, primary and secondary nodes of the instance.
2365 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2367 env.update(_BuildInstanceHookEnvByObject(self.instance))
2368 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2369 list(self.instance.secondary_nodes))
2372 def CheckPrereq(self):
2373 """Check prerequisites.
2375 This checks that the instance is in the cluster.
2378 instance = self.cfg.GetInstanceInfo(
2379 self.cfg.ExpandInstanceName(self.op.instance_name))
2380 if instance is None:
2381 raise errors.OpPrereqError("Instance '%s' not known" %
2382 self.op.instance_name)
2384 # check bridges existance
2385 _CheckInstanceBridgesExist(instance)
2387 self.instance = instance
2388 self.op.instance_name = instance.name
2390 def Exec(self, feedback_fn):
2391 """Reboot the instance.
2394 instance = self.instance
2395 ignore_secondaries = self.op.ignore_secondaries
2396 reboot_type = self.op.reboot_type
2397 extra_args = getattr(self.op, "extra_args", "")
2399 node_current = instance.primary_node
2401 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2402 constants.INSTANCE_REBOOT_HARD,
2403 constants.INSTANCE_REBOOT_FULL]:
2404 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2405 (constants.INSTANCE_REBOOT_SOFT,
2406 constants.INSTANCE_REBOOT_HARD,
2407 constants.INSTANCE_REBOOT_FULL))
2409 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2410 constants.INSTANCE_REBOOT_HARD]:
2411 if not rpc.call_instance_reboot(node_current, instance,
2412 reboot_type, extra_args):
2413 raise errors.OpExecError("Could not reboot instance")
2415 if not rpc.call_instance_shutdown(node_current, instance):
2416 raise errors.OpExecError("could not shutdown instance for full reboot")
2417 _ShutdownInstanceDisks(instance, self.cfg)
2418 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2419 if not rpc.call_instance_start(node_current, instance, extra_args):
2420 _ShutdownInstanceDisks(instance, self.cfg)
2421 raise errors.OpExecError("Could not start instance for full reboot")
2423 self.cfg.MarkInstanceUp(instance.name)
2426 class LUShutdownInstance(LogicalUnit):
2427 """Shutdown an instance.
2430 HPATH = "instance-stop"
2431 HTYPE = constants.HTYPE_INSTANCE
2432 _OP_REQP = ["instance_name"]
2434 def BuildHooksEnv(self):
2437 This runs on master, primary and secondary nodes of the instance.
2440 env = _BuildInstanceHookEnvByObject(self.instance)
2441 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2442 list(self.instance.secondary_nodes))
2445 def CheckPrereq(self):
2446 """Check prerequisites.
2448 This checks that the instance is in the cluster.
2451 instance = self.cfg.GetInstanceInfo(
2452 self.cfg.ExpandInstanceName(self.op.instance_name))
2453 if instance is None:
2454 raise errors.OpPrereqError("Instance '%s' not known" %
2455 self.op.instance_name)
2456 self.instance = instance
2458 def Exec(self, feedback_fn):
2459 """Shutdown the instance.
2462 instance = self.instance
2463 node_current = instance.primary_node
2464 self.cfg.MarkInstanceDown(instance.name)
2465 if not rpc.call_instance_shutdown(node_current, instance):
2466 logger.Error("could not shutdown instance")
2468 _ShutdownInstanceDisks(instance, self.cfg)
2471 class LUReinstallInstance(LogicalUnit):
2472 """Reinstall an instance.
2475 HPATH = "instance-reinstall"
2476 HTYPE = constants.HTYPE_INSTANCE
2477 _OP_REQP = ["instance_name"]
2479 def BuildHooksEnv(self):
2482 This runs on master, primary and secondary nodes of the instance.
2485 env = _BuildInstanceHookEnvByObject(self.instance)
2486 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2487 list(self.instance.secondary_nodes))
2490 def CheckPrereq(self):
2491 """Check prerequisites.
2493 This checks that the instance is in the cluster and is not running.
2496 instance = self.cfg.GetInstanceInfo(
2497 self.cfg.ExpandInstanceName(self.op.instance_name))
2498 if instance is None:
2499 raise errors.OpPrereqError("Instance '%s' not known" %
2500 self.op.instance_name)
2501 if instance.disk_template == constants.DT_DISKLESS:
2502 raise errors.OpPrereqError("Instance '%s' has no disks" %
2503 self.op.instance_name)
2504 if instance.status != "down":
2505 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2506 self.op.instance_name)
2507 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2509 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2510 (self.op.instance_name,
2511 instance.primary_node))
2513 self.op.os_type = getattr(self.op, "os_type", None)
2514 if self.op.os_type is not None:
2516 pnode = self.cfg.GetNodeInfo(
2517 self.cfg.ExpandNodeName(instance.primary_node))
2519 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2521 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2523 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2524 " primary node" % self.op.os_type)
2526 self.instance = instance
2528 def Exec(self, feedback_fn):
2529 """Reinstall the instance.
2532 inst = self.instance
2534 if self.op.os_type is not None:
2535 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2536 inst.os = self.op.os_type
2537 self.cfg.AddInstance(inst)
2539 _StartInstanceDisks(self.cfg, inst, None)
2541 feedback_fn("Running the instance OS create scripts...")
2542 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2543 raise errors.OpExecError("Could not install OS for instance %s"
2545 (inst.name, inst.primary_node))
2547 _ShutdownInstanceDisks(inst, self.cfg)
2550 class LURenameInstance(LogicalUnit):
2551 """Rename an instance.
2554 HPATH = "instance-rename"
2555 HTYPE = constants.HTYPE_INSTANCE
2556 _OP_REQP = ["instance_name", "new_name"]
2558 def BuildHooksEnv(self):
2561 This runs on master, primary and secondary nodes of the instance.
2564 env = _BuildInstanceHookEnvByObject(self.instance)
2565 env["INSTANCE_NEW_NAME"] = self.op.new_name
2566 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2567 list(self.instance.secondary_nodes))
2570 def CheckPrereq(self):
2571 """Check prerequisites.
2573 This checks that the instance is in the cluster and is not running.
2576 instance = self.cfg.GetInstanceInfo(
2577 self.cfg.ExpandInstanceName(self.op.instance_name))
2578 if instance is None:
2579 raise errors.OpPrereqError("Instance '%s' not known" %
2580 self.op.instance_name)
2581 if instance.status != "down":
2582 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2583 self.op.instance_name)
2584 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2586 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2587 (self.op.instance_name,
2588 instance.primary_node))
2589 self.instance = instance
2591 # new name verification
2592 name_info = utils.HostInfo(self.op.new_name)
2594 self.op.new_name = new_name = name_info.name
2595 instance_list = self.cfg.GetInstanceList()
2596 if new_name in instance_list:
2597 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2600 if not getattr(self.op, "ignore_ip", False):
2601 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2602 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2603 (name_info.ip, new_name))
2606 def Exec(self, feedback_fn):
2607 """Reinstall the instance.
2610 inst = self.instance
2611 old_name = inst.name
2613 self.cfg.RenameInstance(inst.name, self.op.new_name)
2615 # re-read the instance from the configuration after rename
2616 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2618 _StartInstanceDisks(self.cfg, inst, None)
2620 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2622 msg = ("Could not run OS rename script for instance %s on node %s"
2623 " (but the instance has been renamed in Ganeti)" %
2624 (inst.name, inst.primary_node))
2627 _ShutdownInstanceDisks(inst, self.cfg)
2630 class LURemoveInstance(LogicalUnit):
2631 """Remove an instance.
2634 HPATH = "instance-remove"
2635 HTYPE = constants.HTYPE_INSTANCE
2636 _OP_REQP = ["instance_name", "ignore_failures"]
2638 def BuildHooksEnv(self):
2641 This runs on master, primary and secondary nodes of the instance.
2644 env = _BuildInstanceHookEnvByObject(self.instance)
2645 nl = [self.sstore.GetMasterNode()]
2648 def CheckPrereq(self):
2649 """Check prerequisites.
2651 This checks that the instance is in the cluster.
2654 instance = self.cfg.GetInstanceInfo(
2655 self.cfg.ExpandInstanceName(self.op.instance_name))
2656 if instance is None:
2657 raise errors.OpPrereqError("Instance '%s' not known" %
2658 self.op.instance_name)
2659 self.instance = instance
2661 def Exec(self, feedback_fn):
2662 """Remove the instance.
2665 instance = self.instance
2666 logger.Info("shutting down instance %s on node %s" %
2667 (instance.name, instance.primary_node))
2669 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2670 if self.op.ignore_failures:
2671 feedback_fn("Warning: can't shutdown instance")
2673 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2674 (instance.name, instance.primary_node))
2676 logger.Info("removing block devices for instance %s" % instance.name)
2678 if not _RemoveDisks(instance, self.cfg):
2679 if self.op.ignore_failures:
2680 feedback_fn("Warning: can't remove instance's disks")
2682 raise errors.OpExecError("Can't remove instance's disks")
2684 logger.Info("removing instance %s out of cluster config" % instance.name)
2686 self.cfg.RemoveInstance(instance.name)
2689 class LUQueryInstances(NoHooksLU):
2690 """Logical unit for querying instances.
2693 _OP_REQP = ["output_fields", "names"]
2695 def CheckPrereq(self):
2696 """Check prerequisites.
2698 This checks that the fields required are valid output fields.
2701 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2702 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2703 "admin_state", "admin_ram",
2704 "disk_template", "ip", "mac", "bridge",
2705 "sda_size", "sdb_size", "vcpus", "tags",
2707 "network_port", "kernel_path", "initrd_path",
2708 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2709 "hvm_cdrom_image_path", "hvm_nic_type",
2710 "hvm_disk_type", "vnc_bind_address"],
2711 dynamic=self.dynamic_fields,
2712 selected=self.op.output_fields)
2715 self.wanted = _GetWantedInstances(self, self.op.names)
2717 def Exec(self, feedback_fn):
2718 """Computes the list of nodes and their attributes.
2721 instance_names = self.wanted
2722 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2725 # begin data gathering
2727 nodes = frozenset([inst.primary_node for inst in instance_list])
2730 if self.dynamic_fields.intersection(self.op.output_fields):
2732 node_data = rpc.call_all_instances_info(nodes)
2734 result = node_data[name]
2736 live_data.update(result)
2737 elif result == False:
2738 bad_nodes.append(name)
2739 # else no instance is alive
2741 live_data = dict([(name, {}) for name in instance_names])
2743 # end data gathering
2746 for instance in instance_list:
2748 for field in self.op.output_fields:
2753 elif field == "pnode":
2754 val = instance.primary_node
2755 elif field == "snodes":
2756 val = list(instance.secondary_nodes)
2757 elif field == "admin_state":
2758 val = (instance.status != "down")
2759 elif field == "oper_state":
2760 if instance.primary_node in bad_nodes:
2763 val = bool(live_data.get(instance.name))
2764 elif field == "status":
2765 if instance.primary_node in bad_nodes:
2766 val = "ERROR_nodedown"
2768 running = bool(live_data.get(instance.name))
2770 if instance.status != "down":
2775 if instance.status != "down":
2779 elif field == "admin_ram":
2780 val = instance.memory
2781 elif field == "oper_ram":
2782 if instance.primary_node in bad_nodes:
2784 elif instance.name in live_data:
2785 val = live_data[instance.name].get("memory", "?")
2788 elif field == "disk_template":
2789 val = instance.disk_template
2791 val = instance.nics[0].ip
2792 elif field == "bridge":
2793 val = instance.nics[0].bridge
2794 elif field == "mac":
2795 val = instance.nics[0].mac
2796 elif field == "sda_size" or field == "sdb_size":
2797 disk = instance.FindDisk(field[:3])
2802 elif field == "vcpus":
2803 val = instance.vcpus
2804 elif field == "tags":
2805 val = list(instance.GetTags())
2806 elif field == "auto_balance":
2807 val = instance.auto_balance
2808 elif field in ("network_port", "kernel_path", "initrd_path",
2809 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2810 "hvm_cdrom_image_path", "hvm_nic_type",
2811 "hvm_disk_type", "vnc_bind_address"):
2812 val = getattr(instance, field, None)
2815 elif field in ("hvm_nic_type", "hvm_disk_type",
2816 "kernel_path", "initrd_path"):
2821 raise errors.ParameterError(field)
2828 class LUFailoverInstance(LogicalUnit):
2829 """Failover an instance.
2832 HPATH = "instance-failover"
2833 HTYPE = constants.HTYPE_INSTANCE
2834 _OP_REQP = ["instance_name", "ignore_consistency"]
2836 def BuildHooksEnv(self):
2839 This runs on master, primary and secondary nodes of the instance.
2843 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2845 env.update(_BuildInstanceHookEnvByObject(self.instance))
2846 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2849 def CheckPrereq(self):
2850 """Check prerequisites.
2852 This checks that the instance is in the cluster.
2855 instance = self.cfg.GetInstanceInfo(
2856 self.cfg.ExpandInstanceName(self.op.instance_name))
2857 if instance is None:
2858 raise errors.OpPrereqError("Instance '%s' not known" %
2859 self.op.instance_name)
2861 if instance.disk_template not in constants.DTS_NET_MIRROR:
2862 raise errors.OpPrereqError("Instance's disk layout is not"
2863 " network mirrored, cannot failover.")
2865 secondary_nodes = instance.secondary_nodes
2866 if not secondary_nodes:
2867 raise errors.ProgrammerError("no secondary node but using "
2868 "DT_REMOTE_RAID1 template")
2870 target_node = secondary_nodes[0]
2871 # check memory requirements on the secondary node
2872 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2873 instance.name, instance.memory)
2875 # check bridge existance
2876 brlist = [nic.bridge for nic in instance.nics]
2877 if not rpc.call_bridges_exist(target_node, brlist):
2878 raise errors.OpPrereqError("One or more target bridges %s does not"
2879 " exist on destination node '%s'" %
2880 (brlist, target_node))
2882 self.instance = instance
2884 def Exec(self, feedback_fn):
2885 """Failover an instance.
2887 The failover is done by shutting it down on its present node and
2888 starting it on the secondary.
2891 instance = self.instance
2893 source_node = instance.primary_node
2894 target_node = instance.secondary_nodes[0]
2896 feedback_fn("* checking disk consistency between source and target")
2897 for dev in instance.disks:
2898 # for remote_raid1, these are md over drbd
2899 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2900 if instance.status == "up" and not self.op.ignore_consistency:
2901 raise errors.OpExecError("Disk %s is degraded on target node,"
2902 " aborting failover." % dev.iv_name)
2904 feedback_fn("* shutting down instance on source node")
2905 logger.Info("Shutting down instance %s on node %s" %
2906 (instance.name, source_node))
2908 if not rpc.call_instance_shutdown(source_node, instance):
2909 if self.op.ignore_consistency:
2910 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2911 " anyway. Please make sure node %s is down" %
2912 (instance.name, source_node, source_node))
2914 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2915 (instance.name, source_node))
2917 feedback_fn("* deactivating the instance's disks on source node")
2918 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2919 raise errors.OpExecError("Can't shut down the instance's disks.")
2921 instance.primary_node = target_node
2922 # distribute new instance config to the other nodes
2923 self.cfg.Update(instance)
2925 # Only start the instance if it's marked as up
2926 if instance.status == "up":
2927 feedback_fn("* activating the instance's disks on target node")
2928 logger.Info("Starting instance %s on node %s" %
2929 (instance.name, target_node))
2931 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2932 ignore_secondaries=True)
2934 _ShutdownInstanceDisks(instance, self.cfg)
2935 raise errors.OpExecError("Can't activate the instance's disks")
2937 feedback_fn("* starting the instance on the target node")
2938 if not rpc.call_instance_start(target_node, instance, None):
2939 _ShutdownInstanceDisks(instance, self.cfg)
2940 raise errors.OpExecError("Could not start instance %s on node %s." %
2941 (instance.name, target_node))
2944 class LUMigrateInstance(LogicalUnit):
2945 """Migrate an instance.
2947 This is migration without shutting down, compared to the failover,
2948 which is done with shutdown.
2951 HPATH = "instance-migrate"
2952 HTYPE = constants.HTYPE_INSTANCE
2953 _OP_REQP = ["instance_name", "live", "cleanup"]
2955 def BuildHooksEnv(self):
2958 This runs on master, primary and secondary nodes of the instance.
2961 env = _BuildInstanceHookEnvByObject(self.instance)
2962 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2965 def CheckPrereq(self):
2966 """Check prerequisites.
2968 This checks that the instance is in the cluster.
2971 instance = self.cfg.GetInstanceInfo(
2972 self.cfg.ExpandInstanceName(self.op.instance_name))
2973 if instance is None:
2974 raise errors.OpPrereqError("Instance '%s' not known" %
2975 self.op.instance_name)
2977 if instance.disk_template != constants.DT_DRBD8:
2978 raise errors.OpPrereqError("Instance's disk layout is not"
2979 " drbd8, cannot migrate.")
2981 secondary_nodes = instance.secondary_nodes
2982 if not secondary_nodes:
2983 raise errors.ProgrammerError("no secondary node but using "
2984 "drbd8 disk template")
2986 target_node = secondary_nodes[0]
2987 # check memory requirements on the secondary node
2988 _CheckNodeFreeMemory(self.cfg, target_node, "migrating instance %s" %
2989 instance.name, instance.memory)
2991 # check bridge existance
2992 brlist = [nic.bridge for nic in instance.nics]
2993 if not rpc.call_bridges_exist(target_node, brlist):
2994 raise errors.OpPrereqError("One or more target bridges %s does not"
2995 " exist on destination node '%s'" %
2996 (brlist, target_node))
2998 if not self.op.cleanup:
2999 migratable = rpc.call_instance_migratable(instance.primary_node,
3002 raise errors.OpPrereqError("Can't contact node '%s'" %
3003 instance.primary_node)
3004 if not migratable[0]:
3005 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3008 self.instance = instance
3010 def _WaitUntilSync(self):
3011 """Poll with custom rpc for disk sync.
3013 This uses our own step-based rpc call.
3016 self.feedback_fn("* wait until resync is done")
3020 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3021 self.instance.disks,
3022 self.nodes_ip, False,
3023 constants.DRBD_RECONF_RPC_WFSYNC)
3025 for node in self.all_nodes:
3026 if not result[node] or not result[node][0]:
3027 raise errors.OpExecError("Cannot resync disks on node %s" % (node,))
3028 node_done, node_percent = result[node][1]
3029 all_done = all_done and node_done
3030 if node_percent is not None:
3031 min_percent = min(min_percent, node_percent)
3033 if min_percent < 100:
3034 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3037 def _EnsureSecondary(self, node):
3038 """Demote a node to secondary.
3041 self.feedback_fn("* switching node %s to secondary mode" % node)
3042 result = rpc.call_drbd_reconfig_net([node], self.instance.name,
3043 self.instance.disks,
3044 self.nodes_ip, False,
3045 constants.DRBD_RECONF_RPC_SECONDARY)
3046 if not result[node] or not result[node][0]:
3047 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3049 (node, result[node][1]))
3051 def _GoStandalone(self):
3052 """Disconnect from the network.
3055 self.feedback_fn("* changing into standalone mode")
3056 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3057 self.instance.disks,
3058 self.nodes_ip, True,
3059 constants.DRBD_RECONF_RPC_DISCONNECT)
3060 for node in self.all_nodes:
3061 if not result[node] or not result[node][0]:
3062 raise errors.OpExecError("Cannot disconnect disks node %s,"
3063 " error %s" % (node, result[node][1]))
3065 def _GoReconnect(self, multimaster):
3066 """Reconnect to the network.
3072 msg = "single-master"
3073 self.feedback_fn("* changing disks into %s mode" % msg)
3074 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3075 self.instance.disks,
3078 constants.DRBD_RECONF_RPC_RECONNECT)
3079 for node in self.all_nodes:
3080 if not result[node] or not result[node][0]:
3081 raise errors.OpExecError("Cannot change disks config on node %s,"
3082 " error %s" % (node, result[node][1]))
3084 def _IdentifyDisks(self):
3085 """Start the migration RPC sequence.
3088 self.feedback_fn("* identifying disks")
3089 result = rpc.call_drbd_reconfig_net(self.all_nodes,
3091 self.instance.disks,
3092 self.nodes_ip, True,
3093 constants.DRBD_RECONF_RPC_INIT)
3094 for node in self.all_nodes:
3095 if not result[node] or not result[node][0]:
3096 raise errors.OpExecError("Cannot identify disks node %s,"
3097 " error %s" % (node, result[node][1]))
3099 def _ExecCleanup(self):
3100 """Try to cleanup after a failed migration.
3102 The cleanup is done by:
3103 - check that the instance is running only on one node
3104 (and update the config if needed)
3105 - change disks on its secondary node to secondary
3106 - wait until disks are fully synchronized
3107 - disconnect from the network
3108 - change disks into single-master mode
3109 - wait again until disks are fully synchronized
3112 instance = self.instance
3113 target_node = self.target_node
3114 source_node = self.source_node
3116 # check running on only one node
3117 self.feedback_fn("* checking where the instance actually runs"
3118 " (if this hangs, the hypervisor might be in"
3120 ins_l = rpc.call_instance_list(self.all_nodes)
3121 for node in self.all_nodes:
3122 if not type(ins_l[node]) is list:
3123 raise errors.OpExecError("Can't contact node '%s'" % node)
3125 runningon_source = instance.name in ins_l[source_node]
3126 runningon_target = instance.name in ins_l[target_node]
3128 if runningon_source and runningon_target:
3129 raise errors.OpExecError("Instance seems to be running on two nodes,"
3130 " or the hypervisor is confused. You will have"
3131 " to ensure manually that it runs only on one"
3132 " and restart this operation.")
3134 if not (runningon_source or runningon_target):
3135 raise errors.OpExecError("Instance does not seem to be running at all."
3136 " In this case, it's safer to repair by"
3137 " running 'gnt-instance stop' to ensure disk"
3138 " shutdown, and then restarting it.")
3140 if runningon_target:
3141 # the migration has actually succeeded, we need to update the config
3142 self.feedback_fn("* instance running on secondary node (%s),"
3143 " updating config" % target_node)
3144 instance.primary_node = target_node
3145 self.cfg.Update(instance)
3146 demoted_node = source_node
3148 self.feedback_fn("* instance confirmed to be running on its"
3149 " primary node (%s)" % source_node)
3150 demoted_node = target_node
3152 self._IdentifyDisks()
3154 self._EnsureSecondary(demoted_node)
3155 self._WaitUntilSync()
3156 self._GoStandalone()
3157 self._GoReconnect(False)
3158 self._WaitUntilSync()
3160 self.feedback_fn("* done")
3162 def _ExecMigration(self):
3163 """Migrate an instance.
3165 The migrate is done by:
3166 - change the disks into dual-master mode
3167 - wait until disks are fully synchronized again
3168 - migrate the instance
3169 - change disks on the new secondary node (the old primary) to secondary
3170 - wait until disks are fully synchronized
3171 - change disks into single-master mode
3174 instance = self.instance
3175 target_node = self.target_node
3176 source_node = self.source_node
3178 self.feedback_fn("* checking disk consistency between source and target")
3179 for dev in instance.disks:
3180 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3181 raise errors.OpExecError("Disk %s is degraded or not fully"
3182 " synchronized on target node,"
3183 " aborting migrate." % dev.iv_name)
3185 self._IdentifyDisks()
3187 self._EnsureSecondary(target_node)
3188 self._GoStandalone()
3189 self._GoReconnect(True)
3190 self._WaitUntilSync()
3192 self.feedback_fn("* migrating instance to %s" % target_node)
3194 result = rpc.call_instance_migrate(source_node, instance,
3195 self.nodes_ip[target_node],
3197 if not result or not result[0]:
3198 logger.Error("Instance migration failed, trying to revert disk status")
3200 self._EnsureSecondary(target_node)
3201 self._GoStandalone()
3202 self._GoReconnect(False)
3203 self._WaitUntilSync()
3204 except errors.OpExecError, err:
3205 logger.Error("Can't reconnect the drives: error '%s'\n"
3206 "Please look and recover the instance status" % str(err))
3208 raise errors.OpExecError("Could not migrate instance %s: %s" %
3209 (instance.name, result[1]))
3212 instance.primary_node = target_node
3213 # distribute new instance config to the other nodes
3214 self.cfg.Update(instance)
3216 self._EnsureSecondary(source_node)
3217 self._WaitUntilSync()
3218 self._GoStandalone()
3219 self._GoReconnect(False)
3220 self._WaitUntilSync()
3222 self.feedback_fn("* done")
3224 def Exec(self, feedback_fn):
3225 """Perform the migration.
3228 self.feedback_fn = feedback_fn
3230 self.source_node = self.instance.primary_node
3231 self.target_node = self.instance.secondary_nodes[0]
3232 self.all_nodes = [self.source_node, self.target_node]
3234 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3235 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3238 return self._ExecCleanup()
3240 return self._ExecMigration()
3243 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3244 """Create a tree of block devices on the primary node.
3246 This always creates all devices.
3250 for child in device.children:
3251 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3254 cfg.SetDiskID(device, node)
3255 new_id = rpc.call_blockdev_create(node, device, device.size,
3256 instance.name, True, info)
3259 if device.physical_id is None:
3260 device.physical_id = new_id
3264 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3265 """Create a tree of block devices on a secondary node.
3267 If this device type has to be created on secondaries, create it and
3270 If not, just recurse to children keeping the same 'force' value.
3273 if device.CreateOnSecondary():
3276 for child in device.children:
3277 if not _CreateBlockDevOnSecondary(cfg, node, instance,
3278 child, force, info):
3283 cfg.SetDiskID(device, node)
3284 new_id = rpc.call_blockdev_create(node, device, device.size,
3285 instance.name, False, info)
3288 if device.physical_id is None:
3289 device.physical_id = new_id
3293 def _GenerateUniqueNames(cfg, exts):
3294 """Generate a suitable LV name.
3296 This will generate a logical volume name for the given instance.
3301 new_id = cfg.GenerateUniqueID()
3302 results.append("%s%s" % (new_id, val))
3306 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3307 """Generate a drbd device complete with its children.
3310 port = cfg.AllocatePort()
3311 vgname = cfg.GetVGName()
3312 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3313 logical_id=(vgname, names[0]))
3314 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3315 logical_id=(vgname, names[1]))
3316 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3317 logical_id = (primary, secondary, port),
3318 children = [dev_data, dev_meta])
3322 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3323 """Generate a drbd8 device complete with its children.
3326 port = cfg.AllocatePort()
3327 vgname = cfg.GetVGName()
3328 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3329 logical_id=(vgname, names[0]))
3330 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3331 logical_id=(vgname, names[1]))
3332 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3333 logical_id = (primary, secondary, port),
3334 children = [dev_data, dev_meta],
3338 def _GenerateDiskTemplate(cfg, template_name,
3339 instance_name, primary_node,
3340 secondary_nodes, disk_sz, swap_sz):
3341 """Generate the entire disk layout for a given template type.
3344 #TODO: compute space requirements
3346 vgname = cfg.GetVGName()
3347 if template_name == constants.DT_DISKLESS:
3349 elif template_name == constants.DT_PLAIN:
3350 if len(secondary_nodes) != 0:
3351 raise errors.ProgrammerError("Wrong template configuration")
3353 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3354 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3355 logical_id=(vgname, names[0]),
3357 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3358 logical_id=(vgname, names[1]),
3360 disks = [sda_dev, sdb_dev]
3361 elif template_name == constants.DT_LOCAL_RAID1:
3362 if len(secondary_nodes) != 0:
3363 raise errors.ProgrammerError("Wrong template configuration")
3366 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3367 ".sdb_m1", ".sdb_m2"])
3368 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3369 logical_id=(vgname, names[0]))
3370 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3371 logical_id=(vgname, names[1]))
3372 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3374 children = [sda_dev_m1, sda_dev_m2])
3375 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3376 logical_id=(vgname, names[2]))
3377 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3378 logical_id=(vgname, names[3]))
3379 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3381 children = [sdb_dev_m1, sdb_dev_m2])
3382 disks = [md_sda_dev, md_sdb_dev]
3383 elif template_name == constants.DT_REMOTE_RAID1:
3384 if len(secondary_nodes) != 1:
3385 raise errors.ProgrammerError("Wrong template configuration")
3386 remote_node = secondary_nodes[0]
3387 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3388 ".sdb_data", ".sdb_meta"])
3389 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3390 disk_sz, names[0:2])
3391 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3392 children = [drbd_sda_dev], size=disk_sz)
3393 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3394 swap_sz, names[2:4])
3395 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3396 children = [drbd_sdb_dev], size=swap_sz)
3397 disks = [md_sda_dev, md_sdb_dev]
3398 elif template_name == constants.DT_DRBD8:
3399 if len(secondary_nodes) != 1:
3400 raise errors.ProgrammerError("Wrong template configuration")
3401 remote_node = secondary_nodes[0]
3402 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3403 ".sdb_data", ".sdb_meta"])
3404 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3405 disk_sz, names[0:2], "sda")
3406 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3407 swap_sz, names[2:4], "sdb")
3408 disks = [drbd_sda_dev, drbd_sdb_dev]
3410 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3414 def _GetInstanceInfoText(instance):
3415 """Compute that text that should be added to the disk's metadata.
3418 return "originstname+%s" % instance.name
3421 def _CreateDisks(cfg, instance):
3422 """Create all disks for an instance.
3424 This abstracts away some work from AddInstance.
3427 instance: the instance object
3430 True or False showing the success of the creation process
3433 info = _GetInstanceInfoText(instance)
3435 for device in instance.disks:
3436 logger.Info("creating volume %s for instance %s" %
3437 (device.iv_name, instance.name))
3439 for secondary_node in instance.secondary_nodes:
3440 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3441 device, False, info):
3442 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3443 (device.iv_name, device, secondary_node))
3446 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3447 instance, device, info):
3448 logger.Error("failed to create volume %s on primary!" %
3454 def _RemoveDisks(instance, cfg):
3455 """Remove all disks for an instance.
3457 This abstracts away some work from `AddInstance()` and
3458 `RemoveInstance()`. Note that in case some of the devices couldn't
3459 be removed, the removal will continue with the other ones (compare
3460 with `_CreateDisks()`).
3463 instance: the instance object
3466 True or False showing the success of the removal proces
3469 logger.Info("removing block devices for instance %s" % instance.name)
3472 for device in instance.disks:
3473 for node, disk in device.ComputeNodeTree(instance.primary_node):
3474 cfg.SetDiskID(disk, node)
3475 if not rpc.call_blockdev_remove(node, disk):
3476 logger.Error("could not remove block device %s on node %s,"
3477 " continuing anyway" %
3478 (device.iv_name, node))
3483 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3484 """Compute disk size requirements in the volume group
3486 This is currently hard-coded for the two-drive layout.
3489 # Required free disk space as a function of disk and swap space
3491 constants.DT_DISKLESS: None,
3492 constants.DT_PLAIN: disk_size + swap_size,
3493 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3494 # 256 MB are added for drbd metadata, 128MB for each drbd device
3495 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3496 constants.DT_DRBD8: disk_size + swap_size + 256,
3499 if disk_template not in req_size_dict:
3500 raise errors.ProgrammerError("Disk template '%s' size requirement"
3501 " is unknown" % disk_template)
3503 return req_size_dict[disk_template]
3506 class LUCreateInstance(LogicalUnit):
3507 """Create an instance.
3510 HPATH = "instance-add"
3511 HTYPE = constants.HTYPE_INSTANCE
3512 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3513 "disk_template", "swap_size", "mode", "start", "vcpus",
3514 "wait_for_sync", "ip_check", "mac", "auto_balance"]
3516 def _RunAllocator(self):
3517 """Run the allocator based on input opcode.
3520 disks = [{"size": self.op.disk_size, "mode": "w"},
3521 {"size": self.op.swap_size, "mode": "w"}]
3522 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3523 "bridge": self.op.bridge}]
3524 ial = IAllocator(self.cfg, self.sstore,
3525 mode=constants.IALLOCATOR_MODE_ALLOC,
3526 name=self.op.instance_name,
3527 disk_template=self.op.disk_template,
3530 vcpus=self.op.vcpus,
3531 mem_size=self.op.mem_size,
3536 ial.Run(self.op.iallocator)
3539 raise errors.OpPrereqError("Can't compute nodes using"
3540 " iallocator '%s': %s" % (self.op.iallocator,
3542 if len(ial.nodes) != ial.required_nodes:
3543 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3544 " of nodes (%s), required %s" %
3545 (len(ial.nodes), ial.required_nodes))
3546 self.op.pnode = ial.nodes[0]
3547 logger.ToStdout("Selected nodes for the instance: %s" %
3548 (", ".join(ial.nodes),))
3549 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3550 (self.op.instance_name, self.op.iallocator, ial.nodes))
3551 if ial.required_nodes == 2:
3552 self.op.snode = ial.nodes[1]
3554 def BuildHooksEnv(self):
3557 This runs on master, primary and secondary nodes of the instance.
3561 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3562 "INSTANCE_DISK_SIZE": self.op.disk_size,
3563 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3564 "INSTANCE_ADD_MODE": self.op.mode,
3566 if self.op.mode == constants.INSTANCE_IMPORT:
3567 env["INSTANCE_SRC_NODE"] = self.op.src_node
3568 env["INSTANCE_SRC_PATH"] = self.op.src_path
3569 env["INSTANCE_SRC_IMAGE"] = self.src_image
3571 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3572 primary_node=self.op.pnode,
3573 secondary_nodes=self.secondaries,
3574 status=self.instance_status,
3575 os_type=self.op.os_type,
3576 memory=self.op.mem_size,
3577 vcpus=self.op.vcpus,
3578 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3581 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3586 def CheckPrereq(self):
3587 """Check prerequisites.
3590 # set optional parameters to none if they don't exist
3591 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3592 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3593 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3594 if not hasattr(self.op, attr):
3595 setattr(self.op, attr, None)
3597 if self.op.mode not in (constants.INSTANCE_CREATE,
3598 constants.INSTANCE_IMPORT):
3599 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3602 if self.op.mode == constants.INSTANCE_IMPORT:
3603 src_node = getattr(self.op, "src_node", None)
3604 src_path = getattr(self.op, "src_path", None)
3605 if src_node is None or src_path is None:
3606 raise errors.OpPrereqError("Importing an instance requires source"
3607 " node and path options")
3608 src_node_full = self.cfg.ExpandNodeName(src_node)
3609 if src_node_full is None:
3610 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3611 self.op.src_node = src_node = src_node_full
3613 if not os.path.isabs(src_path):
3614 raise errors.OpPrereqError("The source path must be absolute")
3616 export_info = rpc.call_export_info(src_node, src_path)
3619 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3621 if not export_info.has_section(constants.INISECT_EXP):
3622 raise errors.ProgrammerError("Corrupted export config")
3624 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3625 if (int(ei_version) != constants.EXPORT_VERSION):
3626 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3627 (ei_version, constants.EXPORT_VERSION))
3629 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3630 raise errors.OpPrereqError("Can't import instance with more than"
3633 # FIXME: are the old os-es, disk sizes, etc. useful?
3634 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3635 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3637 self.src_image = diskimage
3638 else: # INSTANCE_CREATE
3639 if getattr(self.op, "os_type", None) is None:
3640 raise errors.OpPrereqError("No guest OS specified")
3642 #### instance parameters check
3644 # disk template and mirror node verification
3645 if self.op.disk_template not in constants.DISK_TEMPLATES:
3646 raise errors.OpPrereqError("Invalid disk template name")
3648 # instance name verification
3649 hostname1 = utils.HostInfo(self.op.instance_name)
3651 self.op.instance_name = instance_name = hostname1.name
3652 instance_list = self.cfg.GetInstanceList()
3653 if instance_name in instance_list:
3654 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3657 # ip validity checks
3658 ip = getattr(self.op, "ip", None)
3659 if ip is None or ip.lower() == "none":
3661 elif ip.lower() == "auto":
3662 inst_ip = hostname1.ip
3664 if not utils.IsValidIP(ip):
3665 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3666 " like a valid IP" % ip)
3668 self.inst_ip = self.op.ip = inst_ip
3670 if self.op.start and not self.op.ip_check:
3671 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3672 " adding an instance in start mode")
3674 if self.op.ip_check:
3675 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3676 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3677 (hostname1.ip, instance_name))
3679 # MAC address verification
3680 if self.op.mac != "auto":
3681 if not utils.IsValidMac(self.op.mac.lower()):
3682 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3685 # bridge verification
3686 bridge = getattr(self.op, "bridge", None)
3688 self.op.bridge = self.cfg.GetDefBridge()
3690 self.op.bridge = bridge
3692 # boot order verification
3693 if self.op.hvm_boot_order is not None:
3694 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3695 raise errors.OpPrereqError("invalid boot order specified,"
3696 " must be one or more of [acdn]")
3699 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3700 raise errors.OpPrereqError("One and only one of iallocator and primary"
3701 " node must be given")
3703 if self.op.iallocator is not None:
3704 self._RunAllocator()
3706 #### node related checks
3708 # check primary node
3709 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3711 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3713 self.op.pnode = pnode.name
3715 self.secondaries = []
3717 # mirror node verification
3718 if self.op.disk_template in constants.DTS_NET_MIRROR:
3719 if getattr(self.op, "snode", None) is None:
3720 raise errors.OpPrereqError("The networked disk templates need"
3723 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3724 if snode_name is None:
3725 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3727 elif snode_name == pnode.name:
3728 raise errors.OpPrereqError("The secondary node cannot be"
3729 " the primary node.")
3730 self.secondaries.append(snode_name)
3732 req_size = _ComputeDiskSize(self.op.disk_template,
3733 self.op.disk_size, self.op.swap_size)
3735 # Check lv size requirements
3736 if req_size is not None:
3737 nodenames = [pnode.name] + self.secondaries
3738 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3739 for node in nodenames:
3740 info = nodeinfo.get(node, None)
3742 raise errors.OpPrereqError("Cannot get current information"
3743 " from node '%s'" % node)
3744 vg_free = info.get('vg_free', None)
3745 if not isinstance(vg_free, int):
3746 raise errors.OpPrereqError("Can't compute free disk space on"
3748 if req_size > info['vg_free']:
3749 raise errors.OpPrereqError("Not enough disk space on target node %s."
3750 " %d MB available, %d MB required" %
3751 (node, info['vg_free'], req_size))
3754 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3756 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3757 " primary node" % self.op.os_type)
3759 if self.op.kernel_path == constants.VALUE_NONE:
3760 raise errors.OpPrereqError("Can't set instance kernel to none")
3763 # bridge check on primary node
3764 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3765 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3766 " destination node '%s'" %
3767 (self.op.bridge, pnode.name))
3769 # memory check on primary node
3771 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3772 "creating instance %s" % self.op.instance_name,
3775 # hvm_cdrom_image_path verification
3776 if self.op.hvm_cdrom_image_path is not None:
3777 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3778 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3779 " be an absolute path or None, not %s" %
3780 self.op.hvm_cdrom_image_path)
3781 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3782 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3783 " regular file or a symlink pointing to"
3784 " an existing regular file, not %s" %
3785 self.op.hvm_cdrom_image_path)
3787 # vnc_bind_address verification
3788 if self.op.vnc_bind_address is not None:
3789 if not utils.IsValidIP(self.op.vnc_bind_address):
3790 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3791 " like a valid IP address" %
3792 self.op.vnc_bind_address)
3794 # Xen HVM device type checks
3795 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3796 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3797 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3798 " hypervisor" % self.op.hvm_nic_type)
3799 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3800 raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3801 " hypervisor" % self.op.hvm_disk_type)
3804 self.instance_status = 'up'
3806 self.instance_status = 'down'
3808 def Exec(self, feedback_fn):
3809 """Create and add the instance to the cluster.
3812 instance = self.op.instance_name
3813 pnode_name = self.pnode.name
3815 if self.op.mac == "auto":
3816 mac_address = self.cfg.GenerateMAC()
3818 mac_address = self.op.mac
3820 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3821 if self.inst_ip is not None:
3822 nic.ip = self.inst_ip
3824 ht_kind = self.sstore.GetHypervisorType()
3825 if ht_kind in constants.HTS_REQ_PORT:
3826 network_port = self.cfg.AllocatePort()
3830 if self.op.vnc_bind_address is None:
3831 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3833 disks = _GenerateDiskTemplate(self.cfg,
3834 self.op.disk_template,
3835 instance, pnode_name,
3836 self.secondaries, self.op.disk_size,
3839 iobj = objects.Instance(name=instance, os=self.op.os_type,
3840 primary_node=pnode_name,
3841 memory=self.op.mem_size,
3842 vcpus=self.op.vcpus,
3843 nics=[nic], disks=disks,
3844 disk_template=self.op.disk_template,
3845 status=self.instance_status,
3846 network_port=network_port,
3847 kernel_path=self.op.kernel_path,
3848 initrd_path=self.op.initrd_path,
3849 hvm_boot_order=self.op.hvm_boot_order,
3850 hvm_acpi=self.op.hvm_acpi,
3851 hvm_pae=self.op.hvm_pae,
3852 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3853 vnc_bind_address=self.op.vnc_bind_address,
3854 hvm_nic_type=self.op.hvm_nic_type,
3855 hvm_disk_type=self.op.hvm_disk_type,
3856 auto_balance=bool(self.op.auto_balance),
3859 feedback_fn("* creating instance disks...")
3860 if not _CreateDisks(self.cfg, iobj):
3861 _RemoveDisks(iobj, self.cfg)
3862 raise errors.OpExecError("Device creation failed, reverting...")
3864 feedback_fn("adding instance %s to cluster config" % instance)
3866 self.cfg.AddInstance(iobj)
3868 if self.op.wait_for_sync:
3869 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3870 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3871 # make sure the disks are not degraded (still sync-ing is ok)
3873 feedback_fn("* checking mirrors status")
3874 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3879 _RemoveDisks(iobj, self.cfg)
3880 self.cfg.RemoveInstance(iobj.name)
3881 raise errors.OpExecError("There are some degraded disks for"
3884 feedback_fn("creating os for instance %s on node %s" %
3885 (instance, pnode_name))
3887 if iobj.disk_template != constants.DT_DISKLESS:
3888 if self.op.mode == constants.INSTANCE_CREATE:
3889 feedback_fn("* running the instance OS create scripts...")
3890 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3891 raise errors.OpExecError("could not add os for instance %s"
3893 (instance, pnode_name))
3895 elif self.op.mode == constants.INSTANCE_IMPORT:
3896 feedback_fn("* running the instance OS import scripts...")
3897 src_node = self.op.src_node
3898 src_image = self.src_image
3899 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3900 src_node, src_image):
3901 raise errors.OpExecError("Could not import os for instance"
3903 (instance, pnode_name))
3905 # also checked in the prereq part
3906 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3910 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3911 feedback_fn("* starting instance...")
3912 if not rpc.call_instance_start(pnode_name, iobj, None):
3913 raise errors.OpExecError("Could not start instance")
3916 class LUConnectConsole(NoHooksLU):
3917 """Connect to an instance's console.
3919 This is somewhat special in that it returns the command line that
3920 you need to run on the master node in order to connect to the
3924 _OP_REQP = ["instance_name"]
3926 def CheckPrereq(self):
3927 """Check prerequisites.
3929 This checks that the instance is in the cluster.
3932 instance = self.cfg.GetInstanceInfo(
3933 self.cfg.ExpandInstanceName(self.op.instance_name))
3934 if instance is None:
3935 raise errors.OpPrereqError("Instance '%s' not known" %
3936 self.op.instance_name)
3937 self.instance = instance
3939 def Exec(self, feedback_fn):
3940 """Connect to the console of an instance
3943 instance = self.instance
3944 node = instance.primary_node
3946 node_insts = rpc.call_instance_list([node])[node]
3947 if node_insts is False:
3948 raise errors.OpExecError("Can't connect to node %s." % node)
3950 if instance.name not in node_insts:
3951 raise errors.OpExecError("Instance %s is not running." % instance.name)
3953 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3955 hyper = hypervisor.GetHypervisor()
3956 console_cmd = hyper.GetShellCommandForConsole(instance)
3958 argv = ["ssh", "-q", "-t"]
3959 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3960 argv.extend(ssh.BATCH_MODE_OPTS)
3962 argv.append(console_cmd)
3966 class LUAddMDDRBDComponent(LogicalUnit):
3967 """Adda new mirror member to an instance's disk.
3970 HPATH = "mirror-add"
3971 HTYPE = constants.HTYPE_INSTANCE
3972 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3974 def BuildHooksEnv(self):
3977 This runs on the master, the primary and all the secondaries.
3981 "NEW_SECONDARY": self.op.remote_node,
3982 "DISK_NAME": self.op.disk_name,
3984 env.update(_BuildInstanceHookEnvByObject(self.instance))
3985 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3986 self.op.remote_node,] + list(self.instance.secondary_nodes)
3989 def CheckPrereq(self):
3990 """Check prerequisites.
3992 This checks that the instance is in the cluster.
3995 instance = self.cfg.GetInstanceInfo(
3996 self.cfg.ExpandInstanceName(self.op.instance_name))
3997 if instance is None:
3998 raise errors.OpPrereqError("Instance '%s' not known" %
3999 self.op.instance_name)
4000 self.instance = instance
4002 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4003 if remote_node is None:
4004 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
4005 self.remote_node = remote_node
4007 if remote_node == instance.primary_node:
4008 raise errors.OpPrereqError("The specified node is the primary node of"
4011 if instance.disk_template != constants.DT_REMOTE_RAID1:
4012 raise errors.OpPrereqError("Instance's disk layout is not"
4014 for disk in instance.disks:
4015 if disk.iv_name == self.op.disk_name:
4018 raise errors.OpPrereqError("Can't find this device ('%s') in the"
4019 " instance." % self.op.disk_name)
4020 if len(disk.children) > 1:
4021 raise errors.OpPrereqError("The device already has two slave devices."
4022 " This would create a 3-disk raid1 which we"
4026 def Exec(self, feedback_fn):
4027 """Add the mirror component
4031 instance = self.instance
4033 remote_node = self.remote_node
4034 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
4035 names = _GenerateUniqueNames(self.cfg, lv_names)
4036 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
4037 remote_node, disk.size, names)
4039 logger.Info("adding new mirror component on secondary")
4041 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
4043 _GetInstanceInfoText(instance)):
4044 raise errors.OpExecError("Failed to create new component on secondary"
4045 " node %s" % remote_node)
4047 logger.Info("adding new mirror component on primary")
4049 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
4051 _GetInstanceInfoText(instance)):
4052 # remove secondary dev
4053 self.cfg.SetDiskID(new_drbd, remote_node)
4054 rpc.call_blockdev_remove(remote_node, new_drbd)
4055 raise errors.OpExecError("Failed to create volume on primary")
4057 # the device exists now
4058 # call the primary node to add the mirror to md
4059 logger.Info("adding new mirror component to md")
4060 if not rpc.call_blockdev_addchildren(instance.primary_node,
4062 logger.Error("Can't add mirror compoment to md!")
4063 self.cfg.SetDiskID(new_drbd, remote_node)
4064 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4065 logger.Error("Can't rollback on secondary")
4066 self.cfg.SetDiskID(new_drbd, instance.primary_node)
4067 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4068 logger.Error("Can't rollback on primary")
4069 raise errors.OpExecError("Can't add mirror component to md array")
4071 disk.children.append(new_drbd)
4073 self.cfg.AddInstance(instance)
4075 _WaitForSync(self.cfg, instance, self.proc)
4080 class LURemoveMDDRBDComponent(LogicalUnit):
4081 """Remove a component from a remote_raid1 disk.
4084 HPATH = "mirror-remove"
4085 HTYPE = constants.HTYPE_INSTANCE
4086 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
4088 def BuildHooksEnv(self):
4091 This runs on the master, the primary and all the secondaries.
4095 "DISK_NAME": self.op.disk_name,
4096 "DISK_ID": self.op.disk_id,
4097 "OLD_SECONDARY": self.old_secondary,
4099 env.update(_BuildInstanceHookEnvByObject(self.instance))
4100 nl = [self.sstore.GetMasterNode(),
4101 self.instance.primary_node] + list(self.instance.secondary_nodes)
4104 def CheckPrereq(self):
4105 """Check prerequisites.
4107 This checks that the instance is in the cluster.
4110 instance = self.cfg.GetInstanceInfo(
4111 self.cfg.ExpandInstanceName(self.op.instance_name))
4112 if instance is None:
4113 raise errors.OpPrereqError("Instance '%s' not known" %
4114 self.op.instance_name)
4115 self.instance = instance
4117 if instance.disk_template != constants.DT_REMOTE_RAID1:
4118 raise errors.OpPrereqError("Instance's disk layout is not"
4120 for disk in instance.disks:
4121 if disk.iv_name == self.op.disk_name:
4124 raise errors.OpPrereqError("Can't find this device ('%s') in the"
4125 " instance." % self.op.disk_name)
4126 for child in disk.children:
4127 if (child.dev_type == constants.LD_DRBD7 and
4128 child.logical_id[2] == self.op.disk_id):
4131 raise errors.OpPrereqError("Can't find the device with this port.")
4133 if len(disk.children) < 2:
4134 raise errors.OpPrereqError("Cannot remove the last component from"
4138 if self.child.logical_id[0] == instance.primary_node:
4142 self.old_secondary = self.child.logical_id[oid]
4144 def Exec(self, feedback_fn):
4145 """Remove the mirror component
4148 instance = self.instance
4151 logger.Info("remove mirror component")
4152 self.cfg.SetDiskID(disk, instance.primary_node)
4153 if not rpc.call_blockdev_removechildren(instance.primary_node,
4155 raise errors.OpExecError("Can't remove child from mirror.")
4157 for node in child.logical_id[:2]:
4158 self.cfg.SetDiskID(child, node)
4159 if not rpc.call_blockdev_remove(node, child):
4160 logger.Error("Warning: failed to remove device from node %s,"
4161 " continuing operation." % node)
4163 disk.children.remove(child)
4164 self.cfg.AddInstance(instance)
4167 class LUReplaceDisks(LogicalUnit):
4168 """Replace the disks of an instance.
4171 HPATH = "mirrors-replace"
4172 HTYPE = constants.HTYPE_INSTANCE
4173 _OP_REQP = ["instance_name", "mode", "disks"]
4175 def _RunAllocator(self):
4176 """Compute a new secondary node using an IAllocator.
4179 ial = IAllocator(self.cfg, self.sstore,
4180 mode=constants.IALLOCATOR_MODE_RELOC,
4181 name=self.op.instance_name,
4182 relocate_from=[self.sec_node])
4184 ial.Run(self.op.iallocator)
4187 raise errors.OpPrereqError("Can't compute nodes using"
4188 " iallocator '%s': %s" % (self.op.iallocator,
4190 if len(ial.nodes) != ial.required_nodes:
4191 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4192 " of nodes (%s), required %s" %
4193 (len(ial.nodes), ial.required_nodes))
4194 self.op.remote_node = ial.nodes[0]
4195 logger.ToStdout("Selected new secondary for the instance: %s" %
4196 self.op.remote_node)
4198 def BuildHooksEnv(self):
4201 This runs on the master, the primary and all the secondaries.
4205 "MODE": self.op.mode,
4206 "NEW_SECONDARY": self.op.remote_node,
4207 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4209 env.update(_BuildInstanceHookEnvByObject(self.instance))
4211 self.sstore.GetMasterNode(),
4212 self.instance.primary_node,
4214 if self.op.remote_node is not None:
4215 nl.append(self.op.remote_node)
4218 def CheckPrereq(self):
4219 """Check prerequisites.
4221 This checks that the instance is in the cluster.
4224 if not hasattr(self.op, "remote_node"):
4225 self.op.remote_node = None
4227 instance = self.cfg.GetInstanceInfo(
4228 self.cfg.ExpandInstanceName(self.op.instance_name))
4229 if instance is None:
4230 raise errors.OpPrereqError("Instance '%s' not known" %
4231 self.op.instance_name)
4232 self.instance = instance
4233 self.op.instance_name = instance.name
4235 if instance.disk_template not in constants.DTS_NET_MIRROR:
4236 raise errors.OpPrereqError("Instance's disk layout is not"
4237 " network mirrored.")
4239 if len(instance.secondary_nodes) != 1:
4240 raise errors.OpPrereqError("The instance has a strange layout,"
4241 " expected one secondary but found %d" %
4242 len(instance.secondary_nodes))
4244 self.sec_node = instance.secondary_nodes[0]
4246 ia_name = getattr(self.op, "iallocator", None)
4247 if ia_name is not None:
4248 if self.op.remote_node is not None:
4249 raise errors.OpPrereqError("Give either the iallocator or the new"
4250 " secondary, not both")
4251 self._RunAllocator()
4253 remote_node = self.op.remote_node
4254 if remote_node is not None:
4255 remote_node = self.cfg.ExpandNodeName(remote_node)
4256 if remote_node is None:
4257 raise errors.OpPrereqError("Node '%s' not known" %
4258 self.op.remote_node)
4259 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4261 self.remote_node_info = None
4262 if remote_node == instance.primary_node:
4263 raise errors.OpPrereqError("The specified node is the primary node of"
4265 elif remote_node == self.sec_node:
4266 if self.op.mode == constants.REPLACE_DISK_SEC:
4267 # this is for DRBD8, where we can't execute the same mode of
4268 # replacement as for drbd7 (no different port allocated)
4269 raise errors.OpPrereqError("Same secondary given, cannot execute"
4271 # the user gave the current secondary, switch to
4272 # 'no-replace-secondary' mode for drbd7
4274 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4275 self.op.mode != constants.REPLACE_DISK_ALL):
4276 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4277 " disks replacement, not individual ones")
4278 if instance.disk_template == constants.DT_DRBD8:
4279 if (self.op.mode == constants.REPLACE_DISK_ALL and
4280 remote_node is not None):
4281 # switch to replace secondary mode
4282 self.op.mode = constants.REPLACE_DISK_SEC
4284 if self.op.mode == constants.REPLACE_DISK_ALL:
4285 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4286 " secondary disk replacement, not"
4288 elif self.op.mode == constants.REPLACE_DISK_PRI:
4289 if remote_node is not None:
4290 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4291 " the secondary while doing a primary"
4292 " node disk replacement")
4293 self.tgt_node = instance.primary_node
4294 self.oth_node = instance.secondary_nodes[0]
4295 elif self.op.mode == constants.REPLACE_DISK_SEC:
4296 self.new_node = remote_node # this can be None, in which case
4297 # we don't change the secondary
4298 self.tgt_node = instance.secondary_nodes[0]
4299 self.oth_node = instance.primary_node
4301 raise errors.ProgrammerError("Unhandled disk replace mode")
4303 for name in self.op.disks:
4304 if instance.FindDisk(name) is None:
4305 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4306 (name, instance.name))
4307 self.op.remote_node = remote_node
4309 def _ExecRR1(self, feedback_fn):
4310 """Replace the disks of an instance.
4313 instance = self.instance
4316 if self.op.remote_node is None:
4317 remote_node = self.sec_node
4319 remote_node = self.op.remote_node
4321 for dev in instance.disks:
4323 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4324 names = _GenerateUniqueNames(cfg, lv_names)
4325 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4326 remote_node, size, names)
4327 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4328 logger.Info("adding new mirror component on secondary for %s" %
4331 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4333 _GetInstanceInfoText(instance)):
4334 raise errors.OpExecError("Failed to create new component on secondary"
4335 " node %s. Full abort, cleanup manually!" %
4338 logger.Info("adding new mirror component on primary")
4340 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4342 _GetInstanceInfoText(instance)):
4343 # remove secondary dev
4344 cfg.SetDiskID(new_drbd, remote_node)
4345 rpc.call_blockdev_remove(remote_node, new_drbd)
4346 raise errors.OpExecError("Failed to create volume on primary!"
4347 " Full abort, cleanup manually!!")
4349 # the device exists now
4350 # call the primary node to add the mirror to md
4351 logger.Info("adding new mirror component to md")
4352 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4354 logger.Error("Can't add mirror compoment to md!")
4355 cfg.SetDiskID(new_drbd, remote_node)
4356 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4357 logger.Error("Can't rollback on secondary")
4358 cfg.SetDiskID(new_drbd, instance.primary_node)
4359 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4360 logger.Error("Can't rollback on primary")
4361 raise errors.OpExecError("Full abort, cleanup manually!!")
4363 dev.children.append(new_drbd)
4364 cfg.AddInstance(instance)
4366 # this can fail as the old devices are degraded and _WaitForSync
4367 # does a combined result over all disks, so we don't check its
4369 _WaitForSync(cfg, instance, self.proc, unlock=True)
4371 # so check manually all the devices
4372 for name in iv_names:
4373 dev, child, new_drbd = iv_names[name]
4374 cfg.SetDiskID(dev, instance.primary_node)
4375 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4377 raise errors.OpExecError("MD device %s is degraded!" % name)
4378 cfg.SetDiskID(new_drbd, instance.primary_node)
4379 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4381 raise errors.OpExecError("New drbd device %s is degraded!" % name)
4383 for name in iv_names:
4384 dev, child, new_drbd = iv_names[name]
4385 logger.Info("remove mirror %s component" % name)
4386 cfg.SetDiskID(dev, instance.primary_node)
4387 if not rpc.call_blockdev_removechildren(instance.primary_node,
4389 logger.Error("Can't remove child from mirror, aborting"
4390 " *this device cleanup*.\nYou need to cleanup manually!!")
4393 for node in child.logical_id[:2]:
4394 logger.Info("remove child device on %s" % node)
4395 cfg.SetDiskID(child, node)
4396 if not rpc.call_blockdev_remove(node, child):
4397 logger.Error("Warning: failed to remove device from node %s,"
4398 " continuing operation." % node)
4400 dev.children.remove(child)
4402 cfg.AddInstance(instance)
4404 def _ExecD8DiskOnly(self, feedback_fn):
4405 """Replace a disk on the primary or secondary for dbrd8.
4407 The algorithm for replace is quite complicated:
4408 - for each disk to be replaced:
4409 - create new LVs on the target node with unique names
4410 - detach old LVs from the drbd device
4411 - rename old LVs to name_replaced.<time_t>
4412 - rename new LVs to old LVs
4413 - attach the new LVs (with the old names now) to the drbd device
4414 - wait for sync across all devices
4415 - for each modified disk:
4416 - remove old LVs (which have the name name_replaces.<time_t>)
4418 Failures are not very well handled.
4422 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4423 instance = self.instance
4425 vgname = self.cfg.GetVGName()
4428 tgt_node = self.tgt_node
4429 oth_node = self.oth_node
4431 # Step: check device activation
4432 self.proc.LogStep(1, steps_total, "check device existence")
4433 info("checking volume groups")
4434 my_vg = cfg.GetVGName()
4435 results = rpc.call_vg_list([oth_node, tgt_node])
4437 raise errors.OpExecError("Can't list volume groups on the nodes")
4438 for node in oth_node, tgt_node:
4439 res = results.get(node, False)
4440 if not res or my_vg not in res:
4441 raise errors.OpExecError("Volume group '%s' not found on %s" %
4443 for dev in instance.disks:
4444 if not dev.iv_name in self.op.disks:
4446 for node in tgt_node, oth_node:
4447 info("checking %s on %s" % (dev.iv_name, node))
4448 cfg.SetDiskID(dev, node)
4449 if not rpc.call_blockdev_find(node, dev):
4450 raise errors.OpExecError("Can't find device %s on node %s" %
4451 (dev.iv_name, node))
4453 # Step: check other node consistency
4454 self.proc.LogStep(2, steps_total, "check peer consistency")
4455 for dev in instance.disks:
4456 if not dev.iv_name in self.op.disks:
4458 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4459 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4460 oth_node==instance.primary_node):
4461 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4462 " to replace disks on this node (%s)" %
4463 (oth_node, tgt_node))
4465 # Step: create new storage
4466 self.proc.LogStep(3, steps_total, "allocate new storage")
4467 for dev in instance.disks:
4468 if not dev.iv_name in self.op.disks:
4471 cfg.SetDiskID(dev, tgt_node)
4472 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4473 names = _GenerateUniqueNames(cfg, lv_names)
4474 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4475 logical_id=(vgname, names[0]))
4476 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4477 logical_id=(vgname, names[1]))
4478 new_lvs = [lv_data, lv_meta]
4479 old_lvs = dev.children
4480 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4481 info("creating new local storage on %s for %s" %
4482 (tgt_node, dev.iv_name))
4483 # since we *always* want to create this LV, we use the
4484 # _Create...OnPrimary (which forces the creation), even if we
4485 # are talking about the secondary node
4486 for new_lv in new_lvs:
4487 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4488 _GetInstanceInfoText(instance)):
4489 raise errors.OpExecError("Failed to create new LV named '%s' on"
4491 (new_lv.logical_id[1], tgt_node))
4493 # Step: for each lv, detach+rename*2+attach
4494 self.proc.LogStep(4, steps_total, "change drbd configuration")
4495 for dev, old_lvs, new_lvs in iv_names.itervalues():
4496 info("detaching %s drbd from local storage" % dev.iv_name)
4497 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4498 raise errors.OpExecError("Can't detach drbd from local storage on node"
4499 " %s for device %s" % (tgt_node, dev.iv_name))
4501 #cfg.Update(instance)
4503 # ok, we created the new LVs, so now we know we have the needed
4504 # storage; as such, we proceed on the target node to rename
4505 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4506 # using the assumption that logical_id == physical_id (which in
4507 # turn is the unique_id on that node)
4509 # FIXME(iustin): use a better name for the replaced LVs
4510 temp_suffix = int(time.time())
4511 ren_fn = lambda d, suff: (d.physical_id[0],
4512 d.physical_id[1] + "_replaced-%s" % suff)
4513 # build the rename list based on what LVs exist on the node
4515 for to_ren in old_lvs:
4516 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4517 if find_res is not None: # device exists
4518 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4520 info("renaming the old LVs on the target node")
4521 if not rpc.call_blockdev_rename(tgt_node, rlist):
4522 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4523 # now we rename the new LVs to the old LVs
4524 info("renaming the new LVs on the target node")
4525 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4526 if not rpc.call_blockdev_rename(tgt_node, rlist):
4527 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4529 for old, new in zip(old_lvs, new_lvs):
4530 new.logical_id = old.logical_id
4531 cfg.SetDiskID(new, tgt_node)
4533 for disk in old_lvs:
4534 disk.logical_id = ren_fn(disk, temp_suffix)
4535 cfg.SetDiskID(disk, tgt_node)
4537 # now that the new lvs have the old name, we can add them to the device
4538 info("adding new mirror component on %s" % tgt_node)
4539 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4540 for new_lv in new_lvs:
4541 if not rpc.call_blockdev_remove(tgt_node, new_lv):
4542 warning("Can't rollback device %s", hint="manually cleanup unused"
4544 raise errors.OpExecError("Can't add local storage to drbd")
4546 dev.children = new_lvs
4547 cfg.Update(instance)
4549 # Step: wait for sync
4551 # this can fail as the old devices are degraded and _WaitForSync
4552 # does a combined result over all disks, so we don't check its
4554 self.proc.LogStep(5, steps_total, "sync devices")
4555 _WaitForSync(cfg, instance, self.proc, unlock=True)
4557 # so check manually all the devices
4558 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4559 cfg.SetDiskID(dev, instance.primary_node)
4560 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4562 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4564 # Step: remove old storage
4565 self.proc.LogStep(6, steps_total, "removing old storage")
4566 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4567 info("remove logical volumes for %s" % name)
4569 cfg.SetDiskID(lv, tgt_node)
4570 if not rpc.call_blockdev_remove(tgt_node, lv):
4571 warning("Can't remove old LV", hint="manually remove unused LVs")
4574 def _ExecD8Secondary(self, feedback_fn):
4575 """Replace the secondary node for drbd8.
4577 The algorithm for replace is quite complicated:
4578 - for all disks of the instance:
4579 - create new LVs on the new node with same names
4580 - shutdown the drbd device on the old secondary
4581 - disconnect the drbd network on the primary
4582 - create the drbd device on the new secondary
4583 - network attach the drbd on the primary, using an artifice:
4584 the drbd code for Attach() will connect to the network if it
4585 finds a device which is connected to the good local disks but
4587 - wait for sync across all devices
4588 - remove all disks from the old secondary
4590 Failures are not very well handled.
4594 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4595 instance = self.instance
4597 vgname = self.cfg.GetVGName()
4600 old_node = self.tgt_node
4601 new_node = self.new_node
4602 pri_node = instance.primary_node
4604 # Step: check device activation
4605 self.proc.LogStep(1, steps_total, "check device existence")
4606 info("checking volume groups")
4607 my_vg = cfg.GetVGName()
4608 results = rpc.call_vg_list([pri_node, new_node])
4610 raise errors.OpExecError("Can't list volume groups on the nodes")
4611 for node in pri_node, new_node:
4612 res = results.get(node, False)
4613 if not res or my_vg not in res:
4614 raise errors.OpExecError("Volume group '%s' not found on %s" %
4616 for dev in instance.disks:
4617 if not dev.iv_name in self.op.disks:
4619 info("checking %s on %s" % (dev.iv_name, pri_node))
4620 cfg.SetDiskID(dev, pri_node)
4621 if not rpc.call_blockdev_find(pri_node, dev):
4622 raise errors.OpExecError("Can't find device %s on node %s" %
4623 (dev.iv_name, pri_node))
4625 # Step: check other node consistency
4626 self.proc.LogStep(2, steps_total, "check peer consistency")
4627 for dev in instance.disks:
4628 if not dev.iv_name in self.op.disks:
4630 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4631 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4632 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4633 " unsafe to replace the secondary" %
4636 # Step: create new storage
4637 self.proc.LogStep(3, steps_total, "allocate new storage")
4638 for dev in instance.disks:
4640 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4641 # since we *always* want to create this LV, we use the
4642 # _Create...OnPrimary (which forces the creation), even if we
4643 # are talking about the secondary node
4644 for new_lv in dev.children:
4645 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4646 _GetInstanceInfoText(instance)):
4647 raise errors.OpExecError("Failed to create new LV named '%s' on"
4649 (new_lv.logical_id[1], new_node))
4651 iv_names[dev.iv_name] = (dev, dev.children)
4653 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4654 for dev in instance.disks:
4656 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4657 # create new devices on new_node
4658 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4659 logical_id=(pri_node, new_node,
4661 children=dev.children)
4662 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4664 _GetInstanceInfoText(instance)):
4665 raise errors.OpExecError("Failed to create new DRBD on"
4666 " node '%s'" % new_node)
4668 for dev in instance.disks:
4669 # we have new devices, shutdown the drbd on the old secondary
4670 info("shutting down drbd for %s on old node" % dev.iv_name)
4671 cfg.SetDiskID(dev, old_node)
4672 if not rpc.call_blockdev_shutdown(old_node, dev):
4673 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4674 hint="Please cleanup this device manually as soon as possible")
4676 info("detaching primary drbds from the network (=> standalone)")
4678 for dev in instance.disks:
4679 cfg.SetDiskID(dev, pri_node)
4680 # set the physical (unique in bdev terms) id to None, meaning
4681 # detach from network
4682 dev.physical_id = (None,) * len(dev.physical_id)
4683 # and 'find' the device, which will 'fix' it to match the
4685 if rpc.call_blockdev_find(pri_node, dev):
4688 warning("Failed to detach drbd %s from network, unusual case" %
4692 # no detaches succeeded (very unlikely)
4693 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4695 # if we managed to detach at least one, we update all the disks of
4696 # the instance to point to the new secondary
4697 info("updating instance configuration")
4698 for dev in instance.disks:
4699 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4700 cfg.SetDiskID(dev, pri_node)
4701 cfg.Update(instance)
4703 # and now perform the drbd attach
4704 info("attaching primary drbds to new secondary (standalone => connected)")
4706 for dev in instance.disks:
4707 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4708 # since the attach is smart, it's enough to 'find' the device,
4709 # it will automatically activate the network, if the physical_id
4711 cfg.SetDiskID(dev, pri_node)
4712 if not rpc.call_blockdev_find(pri_node, dev):
4713 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4714 "please do a gnt-instance info to see the status of disks")
4716 # this can fail as the old devices are degraded and _WaitForSync
4717 # does a combined result over all disks, so we don't check its
4719 self.proc.LogStep(5, steps_total, "sync devices")
4720 _WaitForSync(cfg, instance, self.proc, unlock=True)
4722 # so check manually all the devices
4723 for name, (dev, old_lvs) in iv_names.iteritems():
4724 cfg.SetDiskID(dev, pri_node)
4725 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4727 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4729 self.proc.LogStep(6, steps_total, "removing old storage")
4730 for name, (dev, old_lvs) in iv_names.iteritems():
4731 info("remove logical volumes for %s" % name)
4733 cfg.SetDiskID(lv, old_node)
4734 if not rpc.call_blockdev_remove(old_node, lv):
4735 warning("Can't remove LV on old secondary",
4736 hint="Cleanup stale volumes by hand")
4738 def Exec(self, feedback_fn):
4739 """Execute disk replacement.
4741 This dispatches the disk replacement to the appropriate handler.
4744 instance = self.instance
4746 # Activate the instance disks if we're replacing them on a down instance
4747 if instance.status == "down":
4748 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4749 self.proc.ChainOpCode(op)
4751 if instance.disk_template == constants.DT_REMOTE_RAID1:
4753 elif instance.disk_template == constants.DT_DRBD8:
4754 if self.op.remote_node is None:
4755 fn = self._ExecD8DiskOnly
4757 fn = self._ExecD8Secondary
4759 raise errors.ProgrammerError("Unhandled disk replacement case")
4761 ret = fn(feedback_fn)
4763 # Deactivate the instance disks if we're replacing them on a down instance
4764 if instance.status == "down":
4765 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4766 self.proc.ChainOpCode(op)
4771 class LUGrowDisk(LogicalUnit):
4772 """Grow a disk of an instance.
4776 HTYPE = constants.HTYPE_INSTANCE
4777 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4779 def BuildHooksEnv(self):
4782 This runs on the master, the primary and all the secondaries.
4786 "DISK": self.op.disk,
4787 "AMOUNT": self.op.amount,
4789 env.update(_BuildInstanceHookEnvByObject(self.instance))
4791 self.sstore.GetMasterNode(),
4792 self.instance.primary_node,
4796 def CheckPrereq(self):
4797 """Check prerequisites.
4799 This checks that the instance is in the cluster.
4802 instance = self.cfg.GetInstanceInfo(
4803 self.cfg.ExpandInstanceName(self.op.instance_name))
4804 if instance is None:
4805 raise errors.OpPrereqError("Instance '%s' not known" %
4806 self.op.instance_name)
4808 if self.op.amount <= 0:
4809 raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4811 self.instance = instance
4812 self.op.instance_name = instance.name
4814 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4815 raise errors.OpPrereqError("Instance's disk layout does not support"
4818 self.disk = instance.FindDisk(self.op.disk)
4819 if self.disk is None:
4820 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4821 (self.op.disk, instance.name))
4823 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4824 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4825 for node in nodenames:
4826 info = nodeinfo.get(node, None)
4828 raise errors.OpPrereqError("Cannot get current information"
4829 " from node '%s'" % node)
4830 vg_free = info.get('vg_free', None)
4831 if not isinstance(vg_free, int):
4832 raise errors.OpPrereqError("Can't compute free disk space on"
4834 if self.op.amount > info['vg_free']:
4835 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4836 " %d MiB available, %d MiB required" %
4837 (node, info['vg_free'], self.op.amount))
4838 is_primary = (node == instance.primary_node)
4839 if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4840 raise errors.OpPrereqError("Disk %s is degraded or not fully"
4841 " synchronized on node %s,"
4842 " aborting grow." % (self.op.disk, node))
4844 def Exec(self, feedback_fn):
4845 """Execute disk grow.
4848 instance = self.instance
4850 for node in (instance.secondary_nodes + (instance.primary_node,)):
4851 self.cfg.SetDiskID(disk, node)
4852 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4853 if not result or not isinstance(result, tuple) or len(result) != 2:
4854 raise errors.OpExecError("grow request failed to node %s" % node)
4856 raise errors.OpExecError("grow request failed to node %s: %s" %
4858 disk.RecordGrow(self.op.amount)
4859 self.cfg.Update(instance)
4860 if self.op.wait_for_sync:
4861 disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4863 logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4864 " Please check the instance.")
4867 class LUQueryInstanceData(NoHooksLU):
4868 """Query runtime instance data.
4871 _OP_REQP = ["instances", "static"]
4873 def CheckPrereq(self):
4874 """Check prerequisites.
4876 This only checks the optional instance list against the existing names.
4879 if not isinstance(self.op.instances, list):
4880 raise errors.OpPrereqError("Invalid argument type 'instances'")
4881 if self.op.instances:
4882 self.wanted_instances = []
4883 names = self.op.instances
4885 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4886 if instance is None:
4887 raise errors.OpPrereqError("No such instance name '%s'" % name)
4888 self.wanted_instances.append(instance)
4890 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4891 in self.cfg.GetInstanceList()]
4895 def _ComputeDiskStatus(self, instance, snode, dev):
4896 """Compute block device status.
4899 static = self.op.static
4901 self.cfg.SetDiskID(dev, instance.primary_node)
4902 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4906 if dev.dev_type in constants.LDS_DRBD:
4907 # we change the snode then (otherwise we use the one passed in)
4908 if dev.logical_id[0] == instance.primary_node:
4909 snode = dev.logical_id[1]
4911 snode = dev.logical_id[0]
4913 if snode and not static:
4914 self.cfg.SetDiskID(dev, snode)
4915 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4920 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4921 for child in dev.children]
4926 "iv_name": dev.iv_name,
4927 "dev_type": dev.dev_type,
4928 "logical_id": dev.logical_id,
4929 "physical_id": dev.physical_id,
4930 "pstatus": dev_pstatus,
4931 "sstatus": dev_sstatus,
4932 "children": dev_children,
4937 def Exec(self, feedback_fn):
4938 """Gather and return data"""
4940 for instance in self.wanted_instances:
4941 if not self.op.static:
4942 remote_info = rpc.call_instance_info(instance.primary_node,
4944 if remote_info and "state" in remote_info:
4947 remote_state = "down"
4950 if instance.status == "down":
4951 config_state = "down"
4955 disks = [self._ComputeDiskStatus(instance, None, device)
4956 for device in instance.disks]
4959 "name": instance.name,
4960 "config_state": config_state,
4961 "run_state": remote_state,
4962 "pnode": instance.primary_node,
4963 "snodes": instance.secondary_nodes,
4965 "memory": instance.memory,
4966 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4968 "vcpus": instance.vcpus,
4969 "auto_balance": instance.auto_balance,
4972 htkind = self.sstore.GetHypervisorType()
4973 if htkind == constants.HT_XEN_PVM30:
4974 idict["kernel_path"] = instance.kernel_path
4975 idict["initrd_path"] = instance.initrd_path
4977 if htkind == constants.HT_XEN_HVM31:
4978 idict["hvm_boot_order"] = instance.hvm_boot_order
4979 idict["hvm_acpi"] = instance.hvm_acpi
4980 idict["hvm_pae"] = instance.hvm_pae
4981 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4982 idict["hvm_nic_type"] = instance.hvm_nic_type
4983 idict["hvm_disk_type"] = instance.hvm_disk_type
4985 if htkind in constants.HTS_REQ_PORT:
4986 if instance.network_port is None:
4987 vnc_console_port = None
4988 elif instance.vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4989 vnc_console_port = "%s:%s" % (instance.primary_node,
4990 instance.network_port)
4991 elif instance.vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4992 vnc_console_port = "%s:%s on node %s" % (instance.vnc_bind_address,
4993 instance.network_port,
4994 instance.primary_node)
4996 vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4997 instance.network_port)
4998 idict["vnc_console_port"] = vnc_console_port
4999 idict["vnc_bind_address"] = instance.vnc_bind_address
5000 idict["network_port"] = instance.network_port
5002 result[instance.name] = idict
5007 class LUSetInstanceParms(LogicalUnit):
5008 """Modifies an instances's parameters.
5011 HPATH = "instance-modify"
5012 HTYPE = constants.HTYPE_INSTANCE
5013 _OP_REQP = ["instance_name"]
5015 def BuildHooksEnv(self):
5018 This runs on the master, primary and secondaries.
5023 args['memory'] = self.mem
5025 args['vcpus'] = self.vcpus
5026 if self.do_ip or self.do_bridge or self.mac:
5030 ip = self.instance.nics[0].ip
5032 bridge = self.bridge
5034 bridge = self.instance.nics[0].bridge
5038 mac = self.instance.nics[0].mac
5039 args['nics'] = [(ip, bridge, mac)]
5040 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
5041 nl = [self.sstore.GetMasterNode(),
5042 self.instance.primary_node] + list(self.instance.secondary_nodes)
5045 def CheckPrereq(self):
5046 """Check prerequisites.
5048 This only checks the instance list against the existing names.
5051 self.mem = getattr(self.op, "mem", None)
5052 self.vcpus = getattr(self.op, "vcpus", None)
5053 self.ip = getattr(self.op, "ip", None)
5054 self.mac = getattr(self.op, "mac", None)
5055 self.bridge = getattr(self.op, "bridge", None)
5056 self.kernel_path = getattr(self.op, "kernel_path", None)
5057 self.initrd_path = getattr(self.op, "initrd_path", None)
5058 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
5059 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
5060 self.hvm_pae = getattr(self.op, "hvm_pae", None)
5061 self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
5062 self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
5063 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
5064 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
5065 self.force = getattr(self.op, "force", None)
5066 self.auto_balance = getattr(self.op, "auto_balance", None)
5068 self.mem, self.vcpus, self.ip, self.bridge, self.mac,
5069 self.kernel_path, self.initrd_path, self.hvm_boot_order,
5070 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
5071 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type,
5074 if all_parms.count(None) == len(all_parms):
5075 raise errors.OpPrereqError("No changes submitted")
5076 if self.mem is not None:
5078 self.mem = int(self.mem)
5079 except ValueError, err:
5080 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
5081 if self.vcpus is not None:
5083 self.vcpus = int(self.vcpus)
5084 except ValueError, err:
5085 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
5086 if self.ip is not None:
5088 if self.ip.lower() == "none":
5091 if not utils.IsValidIP(self.ip):
5092 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
5095 self.do_bridge = (self.bridge is not None)
5096 if self.mac is not None:
5097 if self.cfg.IsMacInUse(self.mac):
5098 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
5100 if not utils.IsValidMac(self.mac):
5101 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
5103 if self.kernel_path is not None:
5104 self.do_kernel_path = True
5105 if self.kernel_path == constants.VALUE_NONE:
5106 raise errors.OpPrereqError("Can't set instance to no kernel")
5108 if self.kernel_path != constants.VALUE_DEFAULT:
5109 if not os.path.isabs(self.kernel_path):
5110 raise errors.OpPrereqError("The kernel path must be an absolute"
5113 self.do_kernel_path = False
5115 if self.initrd_path is not None:
5116 self.do_initrd_path = True
5117 if self.initrd_path not in (constants.VALUE_NONE,
5118 constants.VALUE_DEFAULT):
5119 if not os.path.isabs(self.initrd_path):
5120 raise errors.OpPrereqError("The initrd path must be an absolute"
5123 self.do_initrd_path = False
5125 # boot order verification
5126 if self.hvm_boot_order is not None:
5127 if self.hvm_boot_order != constants.VALUE_DEFAULT:
5128 if len(self.hvm_boot_order.strip("acdn")) != 0:
5129 raise errors.OpPrereqError("invalid boot order specified,"
5130 " must be one or more of [acdn]"
5133 # hvm_cdrom_image_path verification
5134 if self.op.hvm_cdrom_image_path is not None:
5135 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
5136 self.op.hvm_cdrom_image_path.lower() == "none"):
5137 raise errors.OpPrereqError("The path to the HVM CDROM image must"
5138 " be an absolute path or None, not %s" %
5139 self.op.hvm_cdrom_image_path)
5140 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
5141 self.op.hvm_cdrom_image_path.lower() == "none"):
5142 raise errors.OpPrereqError("The HVM CDROM image must either be a"
5143 " regular file or a symlink pointing to"
5144 " an existing regular file, not %s" %
5145 self.op.hvm_cdrom_image_path)
5147 # vnc_bind_address verification
5148 if self.op.vnc_bind_address is not None:
5149 if not utils.IsValidIP(self.op.vnc_bind_address):
5150 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
5151 " like a valid IP address" %
5152 self.op.vnc_bind_address)
5154 # Xen HVM device type checks
5155 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
5156 if self.op.hvm_nic_type is not None:
5157 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
5158 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
5159 " HVM hypervisor" % self.op.hvm_nic_type)
5160 if self.op.hvm_disk_type is not None:
5161 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
5162 raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
5163 " HVM hypervisor" % self.op.hvm_disk_type)
5165 # auto balance setting
5166 if self.auto_balance is not None:
5167 # convert the value to a proper bool value, if it's not
5168 self.auto_balance = bool(self.auto_balance)
5170 instance = self.cfg.GetInstanceInfo(
5171 self.cfg.ExpandInstanceName(self.op.instance_name))
5172 if instance is None:
5173 raise errors.OpPrereqError("No such instance name '%s'" %
5174 self.op.instance_name)
5175 self.op.instance_name = instance.name
5176 self.instance = instance
5178 if self.mem is not None and not self.force:
5179 pnode = self.instance.primary_node
5181 if instance.auto_balance:
5182 nodelist.extend(instance.secondary_nodes)
5183 instance_info = rpc.call_instance_info(pnode, instance.name)
5184 nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
5186 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
5187 # Assume the primary node is unreachable and go ahead
5188 self.warn.append("Can't get info from primary node %s" % pnode)
5191 current_mem = instance_info['memory']
5193 # Assume instance not running
5194 # (there is a slight race condition here, but it's not very probable,
5195 # and we have no other way to check)
5197 miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
5199 raise errors.OpPrereqError("This change will prevent the instance"
5200 " from starting, due to %d MB of memory"
5201 " missing on its primary node" % miss_mem)
5203 if instance.auto_balance:
5204 for node in instance.secondary_nodes:
5205 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
5206 self.warn.append("Can't get info from secondary node %s" % node)
5207 elif self.mem > nodeinfo[node]['memory_free']:
5208 self.warn.append("Not enough memory to failover instance to"
5209 " secondary node %s" % node)
5212 def Exec(self, feedback_fn):
5213 """Modifies an instance.
5215 All parameters take effect only at the next restart of the instance.
5217 # Process here the warnings from CheckPrereq, as we don't have a
5218 # feedback_fn there.
5219 for warn in self.warn:
5220 feedback_fn("WARNING: %s" % warn)
5223 instance = self.instance
5225 instance.memory = self.mem
5226 result.append(("mem", self.mem))
5228 instance.vcpus = self.vcpus
5229 result.append(("vcpus", self.vcpus))
5231 instance.nics[0].ip = self.ip
5232 result.append(("ip", self.ip))
5234 instance.nics[0].bridge = self.bridge
5235 result.append(("bridge", self.bridge))
5237 instance.nics[0].mac = self.mac
5238 result.append(("mac", self.mac))
5239 if self.do_kernel_path:
5240 instance.kernel_path = self.kernel_path
5241 result.append(("kernel_path", self.kernel_path))
5242 if self.do_initrd_path:
5243 instance.initrd_path = self.initrd_path
5244 result.append(("initrd_path", self.initrd_path))
5245 if self.hvm_boot_order:
5246 if self.hvm_boot_order == constants.VALUE_DEFAULT:
5247 instance.hvm_boot_order = None
5249 instance.hvm_boot_order = self.hvm_boot_order
5250 result.append(("hvm_boot_order", self.hvm_boot_order))
5251 if self.hvm_acpi is not None:
5252 instance.hvm_acpi = self.hvm_acpi
5253 result.append(("hvm_acpi", self.hvm_acpi))
5254 if self.hvm_pae is not None:
5255 instance.hvm_pae = self.hvm_pae
5256 result.append(("hvm_pae", self.hvm_pae))
5257 if self.hvm_nic_type is not None:
5258 instance.hvm_nic_type = self.hvm_nic_type
5259 result.append(("hvm_nic_type", self.hvm_nic_type))
5260 if self.hvm_disk_type is not None:
5261 instance.hvm_disk_type = self.hvm_disk_type
5262 result.append(("hvm_disk_type", self.hvm_disk_type))
5263 if self.hvm_cdrom_image_path:
5264 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
5265 instance.hvm_cdrom_image_path = None
5267 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
5268 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
5269 if self.vnc_bind_address:
5270 instance.vnc_bind_address = self.vnc_bind_address
5271 result.append(("vnc_bind_address", self.vnc_bind_address))
5272 if self.auto_balance is not None:
5273 instance.auto_balance = self.auto_balance
5274 result.append(("auto_balance", self.auto_balance))
5276 self.cfg.AddInstance(instance)
5281 class LUQueryExports(NoHooksLU):
5282 """Query the exports list
5287 def CheckPrereq(self):
5288 """Check that the nodelist contains only existing nodes.
5291 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5293 def Exec(self, feedback_fn):
5294 """Compute the list of all the exported system images.
5297 a dictionary with the structure node->(export-list)
5298 where export-list is a list of the instances exported on
5302 return rpc.call_export_list(self.nodes)
5305 class LUExportInstance(LogicalUnit):
5306 """Export an instance to an image in the cluster.
5309 HPATH = "instance-export"
5310 HTYPE = constants.HTYPE_INSTANCE
5311 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5313 def BuildHooksEnv(self):
5316 This will run on the master, primary node and target node.
5320 "EXPORT_NODE": self.op.target_node,
5321 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5323 env.update(_BuildInstanceHookEnvByObject(self.instance))
5324 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5325 self.op.target_node]
5328 def CheckPrereq(self):
5329 """Check prerequisites.
5331 This checks that the instance and node names are valid.
5334 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5335 self.instance = self.cfg.GetInstanceInfo(instance_name)
5336 if self.instance is None:
5337 raise errors.OpPrereqError("Instance '%s' not found" %
5338 self.op.instance_name)
5341 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5342 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5344 if self.dst_node is None:
5345 raise errors.OpPrereqError("Destination node '%s' is unknown." %
5346 self.op.target_node)
5347 self.op.target_node = self.dst_node.name
5349 def Exec(self, feedback_fn):
5350 """Export an instance to an image in the cluster.
5353 instance = self.instance
5354 dst_node = self.dst_node
5355 src_node = instance.primary_node
5356 if self.op.shutdown:
5357 # shutdown the instance, but not the disks
5358 if not rpc.call_instance_shutdown(src_node, instance):
5359 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5360 (instance.name, src_node))
5362 vgname = self.cfg.GetVGName()
5367 for disk in instance.disks:
5368 if disk.iv_name == "sda":
5369 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5370 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5372 if not new_dev_name:
5373 logger.Error("could not snapshot block device %s on node %s" %
5374 (disk.logical_id[1], src_node))
5376 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5377 logical_id=(vgname, new_dev_name),
5378 physical_id=(vgname, new_dev_name),
5379 iv_name=disk.iv_name)
5380 snap_disks.append(new_dev)
5383 if self.op.shutdown and instance.status == "up":
5384 if not rpc.call_instance_start(src_node, instance, None):
5385 _ShutdownInstanceDisks(instance, self.cfg)
5386 raise errors.OpExecError("Could not start instance")
5388 # TODO: check for size
5390 for dev in snap_disks:
5391 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5393 logger.Error("could not export block device %s from node"
5395 (dev.logical_id[1], src_node, dst_node.name))
5396 if not rpc.call_blockdev_remove(src_node, dev):
5397 logger.Error("could not remove snapshot block device %s from"
5398 " node %s" % (dev.logical_id[1], src_node))
5400 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5401 logger.Error("could not finalize export for instance %s on node %s" %
5402 (instance.name, dst_node.name))
5404 nodelist = self.cfg.GetNodeList()
5405 nodelist.remove(dst_node.name)
5407 # on one-node clusters nodelist will be empty after the removal
5408 # if we proceed the backup would be removed because OpQueryExports
5409 # substitutes an empty list with the full cluster node list.
5411 op = opcodes.OpQueryExports(nodes=nodelist)
5412 exportlist = self.proc.ChainOpCode(op)
5413 for node in exportlist:
5414 if instance.name in exportlist[node]:
5415 if not rpc.call_export_remove(node, instance.name):
5416 logger.Error("could not remove older export for instance %s"
5417 " on node %s" % (instance.name, node))
5420 class LURemoveExport(NoHooksLU):
5421 """Remove exports related to the named instance.
5424 _OP_REQP = ["instance_name"]
5426 def CheckPrereq(self):
5427 """Check prerequisites.
5431 def Exec(self, feedback_fn):
5432 """Remove any export.
5435 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5436 # If the instance was not found we'll try with the name that was passed in.
5437 # This will only work if it was an FQDN, though.
5439 if not instance_name:
5441 instance_name = self.op.instance_name
5443 op = opcodes.OpQueryExports(nodes=[])
5444 exportlist = self.proc.ChainOpCode(op)
5446 for node in exportlist:
5447 if instance_name in exportlist[node]:
5449 if not rpc.call_export_remove(node, instance_name):
5450 logger.Error("could not remove export for instance %s"
5451 " on node %s" % (instance_name, node))
5453 if fqdn_warn and not found:
5454 feedback_fn("Export not found. If trying to remove an export belonging"
5455 " to a deleted instance please use its Fully Qualified"
5459 class TagsLU(NoHooksLU):
5462 This is an abstract class which is the parent of all the other tags LUs.
5465 def CheckPrereq(self):
5466 """Check prerequisites.
5469 if self.op.kind == constants.TAG_CLUSTER:
5470 self.target = self.cfg.GetClusterInfo()
5471 elif self.op.kind == constants.TAG_NODE:
5472 name = self.cfg.ExpandNodeName(self.op.name)
5474 raise errors.OpPrereqError("Invalid node name (%s)" %
5477 self.target = self.cfg.GetNodeInfo(name)
5478 elif self.op.kind == constants.TAG_INSTANCE:
5479 name = self.cfg.ExpandInstanceName(self.op.name)
5481 raise errors.OpPrereqError("Invalid instance name (%s)" %
5484 self.target = self.cfg.GetInstanceInfo(name)
5486 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5490 class LUGetTags(TagsLU):
5491 """Returns the tags of a given object.
5494 _OP_REQP = ["kind", "name"]
5496 def Exec(self, feedback_fn):
5497 """Returns the tag list.
5500 return self.target.GetTags()
5503 class LUSearchTags(NoHooksLU):
5504 """Searches the tags for a given pattern.
5507 _OP_REQP = ["pattern"]
5509 def CheckPrereq(self):
5510 """Check prerequisites.
5512 This checks the pattern passed for validity by compiling it.
5516 self.re = re.compile(self.op.pattern)
5517 except re.error, err:
5518 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5519 (self.op.pattern, err))
5521 def Exec(self, feedback_fn):
5522 """Returns the tag list.
5526 tgts = [("/cluster", cfg.GetClusterInfo())]
5527 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5528 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5529 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5530 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5532 for path, target in tgts:
5533 for tag in target.GetTags():
5534 if self.re.search(tag):
5535 results.append((path, tag))
5539 class LUAddTags(TagsLU):
5540 """Sets a tag on a given object.
5543 _OP_REQP = ["kind", "name", "tags"]
5545 def CheckPrereq(self):
5546 """Check prerequisites.
5548 This checks the type and length of the tag name and value.
5551 TagsLU.CheckPrereq(self)
5552 for tag in self.op.tags:
5553 objects.TaggableObject.ValidateTag(tag)
5555 def Exec(self, feedback_fn):
5560 for tag in self.op.tags:
5561 self.target.AddTag(tag)
5562 except errors.TagError, err:
5563 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5565 self.cfg.Update(self.target)
5566 except errors.ConfigurationError:
5567 raise errors.OpRetryError("There has been a modification to the"
5568 " config file and the operation has been"
5569 " aborted. Please retry.")
5572 class LUDelTags(TagsLU):
5573 """Delete a list of tags from a given object.
5576 _OP_REQP = ["kind", "name", "tags"]
5578 def CheckPrereq(self):
5579 """Check prerequisites.
5581 This checks that we have the given tag.
5584 TagsLU.CheckPrereq(self)
5585 for tag in self.op.tags:
5586 objects.TaggableObject.ValidateTag(tag, removal=True)
5587 del_tags = frozenset(self.op.tags)
5588 cur_tags = self.target.GetTags()
5589 if not del_tags <= cur_tags:
5590 diff_tags = del_tags - cur_tags
5591 diff_names = ["'%s'" % tag for tag in diff_tags]
5593 raise errors.OpPrereqError("Tag(s) %s not found" %
5594 (",".join(diff_names)))
5596 def Exec(self, feedback_fn):
5597 """Remove the tag from the object.
5600 for tag in self.op.tags:
5601 self.target.RemoveTag(tag)
5603 self.cfg.Update(self.target)
5604 except errors.ConfigurationError:
5605 raise errors.OpRetryError("There has been a modification to the"
5606 " config file and the operation has been"
5607 " aborted. Please retry.")
5609 class LUTestDelay(NoHooksLU):
5610 """Sleep for a specified amount of time.
5612 This LU sleeps on the master and/or nodes for a specified amoutn of
5616 _OP_REQP = ["duration", "on_master", "on_nodes"]
5618 def CheckPrereq(self):
5619 """Check prerequisites.
5621 This checks that we have a good list of nodes and/or the duration
5626 if self.op.on_nodes:
5627 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5629 def Exec(self, feedback_fn):
5630 """Do the actual sleep.
5633 if self.op.on_master:
5634 if not utils.TestDelay(self.op.duration):
5635 raise errors.OpExecError("Error during master delay test")
5636 if self.op.on_nodes:
5637 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5639 raise errors.OpExecError("Complete failure from rpc call")
5640 for node, node_result in result.items():
5642 raise errors.OpExecError("Failure during rpc call to node %s,"
5643 " result: %s" % (node, node_result))
5646 class IAllocator(object):
5647 """IAllocator framework.
5649 An IAllocator instance has three sets of attributes:
5650 - cfg/sstore that are needed to query the cluster
5651 - input data (all members of the _KEYS class attribute are required)
5652 - four buffer attributes (in|out_data|text), that represent the
5653 input (to the external script) in text and data structure format,
5654 and the output from it, again in two formats
5655 - the result variables from the script (success, info, nodes) for
5660 "mem_size", "disks", "disk_template",
5661 "os", "tags", "nics", "vcpus",
5667 def __init__(self, cfg, sstore, mode, name, **kwargs):
5669 self.sstore = sstore
5670 # init buffer variables
5671 self.in_text = self.out_text = self.in_data = self.out_data = None
5672 # init all input fields so that pylint is happy
5675 self.mem_size = self.disks = self.disk_template = None
5676 self.os = self.tags = self.nics = self.vcpus = None
5677 self.relocate_from = None
5679 self.required_nodes = None
5680 # init result fields
5681 self.success = self.info = self.nodes = None
5682 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5683 keyset = self._ALLO_KEYS
5684 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5685 keyset = self._RELO_KEYS
5687 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5688 " IAllocator" % self.mode)
5690 if key not in keyset:
5691 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5692 " IAllocator" % key)
5693 setattr(self, key, kwargs[key])
5695 if key not in kwargs:
5696 raise errors.ProgrammerError("Missing input parameter '%s' to"
5697 " IAllocator" % key)
5698 self._BuildInputData()
5700 def _ComputeClusterData(self):
5701 """Compute the generic allocator input data.
5703 This is the data that is independent of the actual operation.
5710 "cluster_name": self.sstore.GetClusterName(),
5711 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5712 "hypervisor_type": self.sstore.GetHypervisorType(),
5713 # we don't have job IDs
5716 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5720 node_list = cfg.GetNodeList()
5721 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5722 for nname in node_list:
5723 ninfo = cfg.GetNodeInfo(nname)
5724 if nname not in node_data or not isinstance(node_data[nname], dict):
5725 raise errors.OpExecError("Can't get data for node %s" % nname)
5726 remote_info = node_data[nname]
5727 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5728 'vg_size', 'vg_free', 'cpu_total']:
5729 if attr not in remote_info:
5730 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5733 remote_info[attr] = int(remote_info[attr])
5734 except ValueError, err:
5735 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5736 " %s" % (nname, attr, str(err)))
5737 # compute memory used by primary instances
5738 i_p_mem = i_p_up_mem = 0
5739 for iinfo in i_list:
5740 if iinfo.primary_node == nname:
5741 i_p_mem += iinfo.memory
5742 if iinfo.status == "up":
5743 i_p_up_mem += iinfo.memory
5745 # compute memory used by instances
5747 "tags": list(ninfo.GetTags()),
5748 "total_memory": remote_info['memory_total'],
5749 "reserved_memory": remote_info['memory_dom0'],
5750 "free_memory": remote_info['memory_free'],
5751 "i_pri_memory": i_p_mem,
5752 "i_pri_up_memory": i_p_up_mem,
5753 "total_disk": remote_info['vg_size'],
5754 "free_disk": remote_info['vg_free'],
5755 "primary_ip": ninfo.primary_ip,
5756 "secondary_ip": ninfo.secondary_ip,
5757 "total_cpus": remote_info['cpu_total'],
5759 node_results[nname] = pnr
5760 data["nodes"] = node_results
5764 for iinfo in i_list:
5765 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5766 for n in iinfo.nics]
5768 "tags": list(iinfo.GetTags()),
5769 "should_run": iinfo.status == "up",
5770 "vcpus": iinfo.vcpus,
5771 "memory": iinfo.memory,
5773 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5775 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5776 "disk_template": iinfo.disk_template,
5778 instance_data[iinfo.name] = pir
5780 data["instances"] = instance_data
5784 def _AddNewInstance(self):
5785 """Add new instance data to allocator structure.
5787 This in combination with _AllocatorGetClusterData will create the
5788 correct structure needed as input for the allocator.
5790 The checks for the completeness of the opcode must have already been
5795 if len(self.disks) != 2:
5796 raise errors.OpExecError("Only two-disk configurations supported")
5798 disk_space = _ComputeDiskSize(self.disk_template,
5799 self.disks[0]["size"], self.disks[1]["size"])
5801 if self.disk_template in constants.DTS_NET_MIRROR:
5802 self.required_nodes = 2
5804 self.required_nodes = 1
5808 "disk_template": self.disk_template,
5811 "vcpus": self.vcpus,
5812 "memory": self.mem_size,
5813 "disks": self.disks,
5814 "disk_space_total": disk_space,
5816 "required_nodes": self.required_nodes,
5818 data["request"] = request
5820 def _AddRelocateInstance(self):
5821 """Add relocate instance data to allocator structure.
5823 This in combination with _IAllocatorGetClusterData will create the
5824 correct structure needed as input for the allocator.
5826 The checks for the completeness of the opcode must have already been
5830 instance = self.cfg.GetInstanceInfo(self.name)
5831 if instance is None:
5832 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5833 " IAllocator" % self.name)
5835 if instance.disk_template not in constants.DTS_NET_MIRROR:
5836 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5838 if len(instance.secondary_nodes) != 1:
5839 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5841 self.required_nodes = 1
5843 disk_space = _ComputeDiskSize(instance.disk_template,
5844 instance.disks[0].size,
5845 instance.disks[1].size)
5850 "disk_space_total": disk_space,
5851 "required_nodes": self.required_nodes,
5852 "relocate_from": self.relocate_from,
5854 self.in_data["request"] = request
5856 def _BuildInputData(self):
5857 """Build input data structures.
5860 self._ComputeClusterData()
5862 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5863 self._AddNewInstance()
5865 self._AddRelocateInstance()
5867 self.in_text = serializer.Dump(self.in_data)
5869 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5870 """Run an instance allocator and return the results.
5875 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5877 if not isinstance(result, tuple) or len(result) != 4:
5878 raise errors.OpExecError("Invalid result from master iallocator runner")
5880 rcode, stdout, stderr, fail = result
5882 if rcode == constants.IARUN_NOTFOUND:
5883 raise errors.OpExecError("Can't find allocator '%s'" % name)
5884 elif rcode == constants.IARUN_FAILURE:
5885 raise errors.OpExecError("Instance allocator call failed: %s,"
5887 (fail, stdout+stderr))
5888 self.out_text = stdout
5890 self._ValidateResult()
5892 def _ValidateResult(self):
5893 """Process the allocator results.
5895 This will process and if successful save the result in
5896 self.out_data and the other parameters.
5900 rdict = serializer.Load(self.out_text)
5901 except Exception, err:
5902 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5904 if not isinstance(rdict, dict):
5905 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5907 for key in "success", "info", "nodes":
5908 if key not in rdict:
5909 raise errors.OpExecError("Can't parse iallocator results:"
5910 " missing key '%s'" % key)
5911 setattr(self, key, rdict[key])
5913 if not isinstance(rdict["nodes"], list):
5914 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5916 self.out_data = rdict
5919 class LUTestAllocator(NoHooksLU):
5920 """Run allocator tests.
5922 This LU runs the allocator tests
5925 _OP_REQP = ["direction", "mode", "name"]
5927 def CheckPrereq(self):
5928 """Check prerequisites.
5930 This checks the opcode parameters depending on the director and mode test.
5933 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5934 for attr in ["name", "mem_size", "disks", "disk_template",
5935 "os", "tags", "nics", "vcpus"]:
5936 if not hasattr(self.op, attr):
5937 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5939 iname = self.cfg.ExpandInstanceName(self.op.name)
5940 if iname is not None:
5941 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5943 if not isinstance(self.op.nics, list):
5944 raise errors.OpPrereqError("Invalid parameter 'nics'")
5945 for row in self.op.nics:
5946 if (not isinstance(row, dict) or
5949 "bridge" not in row):
5950 raise errors.OpPrereqError("Invalid contents of the"
5951 " 'nics' parameter")
5952 if not isinstance(self.op.disks, list):
5953 raise errors.OpPrereqError("Invalid parameter 'disks'")
5954 if len(self.op.disks) != 2:
5955 raise errors.OpPrereqError("Only two-disk configurations supported")
5956 for row in self.op.disks:
5957 if (not isinstance(row, dict) or
5958 "size" not in row or
5959 not isinstance(row["size"], int) or
5960 "mode" not in row or
5961 row["mode"] not in ['r', 'w']):
5962 raise errors.OpPrereqError("Invalid contents of the"
5963 " 'disks' parameter")
5964 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5965 if not hasattr(self.op, "name"):
5966 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5967 fname = self.cfg.ExpandInstanceName(self.op.name)
5969 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5971 self.op.name = fname
5972 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5974 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5977 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5978 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5979 raise errors.OpPrereqError("Missing allocator name")
5980 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5981 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5984 def Exec(self, feedback_fn):
5985 """Run the allocator test.
5988 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5989 ial = IAllocator(self.cfg, self.sstore,
5992 mem_size=self.op.mem_size,
5993 disks=self.op.disks,
5994 disk_template=self.op.disk_template,
5998 vcpus=self.op.vcpus,
6001 ial = IAllocator(self.cfg, self.sstore,
6004 relocate_from=list(self.relocate_from),
6007 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6008 result = ial.in_text
6010 ial.Run(self.op.allocator, validate=False)
6011 result = ial.out_text