4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 No nodes should be returned as an empty list (and not None).
138 Note that if the HPATH for a LU class is None, this function will
142 raise NotImplementedError
144 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145 """Notify the LU about the results of its hooks.
147 This method is called every time a hooks phase is executed, and notifies
148 the Logical Unit about the hooks' result. The LU can then use it to alter
149 its result based on the hooks. By default the method does nothing and the
150 previous result is passed back unchanged but any LU can define it if it
151 wants to use the local cluster hook-scripts somehow.
154 phase: the hooks phase that has just been run
155 hooks_results: the results of the multi-node hooks rpc call
156 feedback_fn: function to send feedback back to the caller
157 lu_result: the previous result this LU had, or None in the PRE phase.
163 class NoHooksLU(LogicalUnit):
164 """Simple LU which runs no hooks.
166 This LU is intended as a parent for other LogicalUnits which will
167 run no hooks, in order to reduce duplicate code.
174 def _AddHostToEtcHosts(hostname):
175 """Wrapper around utils.SetEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182 def _RemoveHostFromEtcHosts(hostname):
183 """Wrapper around utils.RemoveEtcHostsEntry.
186 hi = utils.HostInfo(name=hostname)
187 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191 def _GetWantedNodes(lu, nodes):
192 """Returns list of checked and expanded node names.
195 nodes: List of nodes (strings) or None for all
198 if not isinstance(nodes, list):
199 raise errors.OpPrereqError("Invalid argument type 'nodes'")
205 node = lu.cfg.ExpandNodeName(name)
207 raise errors.OpPrereqError("No such node name '%s'" % name)
211 wanted = lu.cfg.GetNodeList()
212 return utils.NiceSort(wanted)
215 def _GetWantedInstances(lu, instances):
216 """Returns list of checked and expanded instance names.
219 instances: List of instances (strings) or None for all
222 if not isinstance(instances, list):
223 raise errors.OpPrereqError("Invalid argument type 'instances'")
228 for name in instances:
229 instance = lu.cfg.ExpandInstanceName(name)
231 raise errors.OpPrereqError("No such instance name '%s'" % name)
232 wanted.append(instance)
235 wanted = lu.cfg.GetInstanceList()
236 return utils.NiceSort(wanted)
239 def _CheckOutputFields(static, dynamic, selected):
240 """Checks whether all selected fields are valid.
243 static: Static fields
244 dynamic: Dynamic fields
247 static_fields = frozenset(static)
248 dynamic_fields = frozenset(dynamic)
250 all_fields = static_fields | dynamic_fields
252 if not all_fields.issuperset(selected):
253 raise errors.OpPrereqError("Unknown output fields selected: %s"
254 % ",".join(frozenset(selected).
255 difference(all_fields)))
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259 memory, vcpus, nics):
260 """Builds instance related env variables for hooks from single variables.
263 secondary_nodes: List of secondary nodes as strings
267 "INSTANCE_NAME": name,
268 "INSTANCE_PRIMARY": primary_node,
269 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270 "INSTANCE_OS_TYPE": os_type,
271 "INSTANCE_STATUS": status,
272 "INSTANCE_MEMORY": memory,
273 "INSTANCE_VCPUS": vcpus,
277 nic_count = len(nics)
278 for idx, (ip, bridge, mac) in enumerate(nics):
281 env["INSTANCE_NIC%d_IP" % idx] = ip
282 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
287 env["INSTANCE_NIC_COUNT"] = nic_count
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293 """Builds instance related env variables for hooks from an object.
296 instance: objects.Instance object of instance
297 override: dict of values to override
300 'name': instance.name,
301 'primary_node': instance.primary_node,
302 'secondary_nodes': instance.secondary_nodes,
303 'os_type': instance.os,
304 'status': instance.os,
305 'memory': instance.memory,
306 'vcpus': instance.vcpus,
307 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310 args.update(override)
311 return _BuildInstanceHookEnv(**args)
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315 """Ensure a node has a correct known_hosts entry.
318 fullnode - Fully qualified domain name of host. (str)
319 ip - IPv4 address of host (str)
320 pubkey - the public key of the cluster
323 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
335 logger.Debug('read %s' % (repr(rawline),))
337 parts = rawline.rstrip('\r\n').split()
339 # Ignore unwanted lines
340 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341 fields = parts[0].split(',')
346 for spec in [ ip, fullnode ]:
347 if spec not in fields:
352 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353 if haveall and key == pubkey:
355 save_lines.append(rawline)
356 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359 if havesome and (not haveall or key != pubkey):
361 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364 save_lines.append(rawline)
367 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371 save_lines = save_lines + add_lines
373 # Write a new file and replace old.
374 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
376 newfile = os.fdopen(fd, 'w')
378 newfile.write(''.join(save_lines))
381 logger.Debug("Wrote new known_hosts.")
382 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385 # Simply appending a new line will do the trick.
387 for add in add_lines:
393 def _HasValidVG(vglist, vgname):
394 """Checks if the volume group list is valid.
396 A non-None return value means there's an error, and the return value
397 is the error message.
400 vgsize = vglist.get(vgname, None)
402 return "volume group '%s' missing" % vgname
404 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
409 def _InitSSHSetup(node):
410 """Setup the SSH configuration for the cluster.
413 This generates a dsa keypair for root, adds the pub key to the
414 permitted hosts and adds the hostkey to its own known hosts.
417 node: the name of this host as a fqdn
420 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
422 for name in priv_key, pub_key:
423 if os.path.exists(name):
424 utils.CreateBackup(name)
425 utils.RemoveFile(name)
427 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
431 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434 f = open(pub_key, 'r')
436 utils.AddAuthorizedKey(auth_keys, f.read(8192))
441 def _InitGanetiServerSetup(ss):
442 """Setup the necessary configuration for the initial node daemon.
444 This creates the nodepass file containing the shared password for
445 the cluster and also generates the SSL certificate.
448 # Create pseudo random password
449 randpass = sha.new(os.urandom(64)).hexdigest()
450 # and write it into sstore
451 ss.SetKey(ss.SS_NODED_PASS, randpass)
453 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454 "-days", str(365*5), "-nodes", "-x509",
455 "-keyout", constants.SSL_CERT_FILE,
456 "-out", constants.SSL_CERT_FILE, "-batch"])
458 raise errors.OpExecError("could not generate server ssl cert, command"
459 " %s had exitcode %s and error message %s" %
460 (result.cmd, result.exit_code, result.output))
462 os.chmod(constants.SSL_CERT_FILE, 0400)
464 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467 raise errors.OpExecError("Could not start the node daemon, command %s"
468 " had exitcode %s and error %s" %
469 (result.cmd, result.exit_code, result.output))
472 def _CheckInstanceBridgesExist(instance):
473 """Check that the brigdes needed by an instance exist.
476 # check bridges existance
477 brlist = [nic.bridge for nic in instance.nics]
478 if not rpc.call_bridges_exist(instance.primary_node, brlist):
479 raise errors.OpPrereqError("one or more target bridges %s does not"
480 " exist on destination node '%s'" %
481 (brlist, instance.primary_node))
484 class LUInitCluster(LogicalUnit):
485 """Initialise the cluster.
488 HPATH = "cluster-init"
489 HTYPE = constants.HTYPE_CLUSTER
490 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491 "def_bridge", "master_netdev"]
494 def BuildHooksEnv(self):
497 Notes: Since we don't require a cluster, we must manually add
498 ourselves in the post-run node list.
501 env = {"OP_TARGET": self.op.cluster_name}
502 return env, [], [self.hostname.name]
504 def CheckPrereq(self):
505 """Verify that the passed name is a valid one.
508 if config.ConfigWriter.IsCluster():
509 raise errors.OpPrereqError("Cluster is already initialised")
511 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512 if not os.path.exists(constants.VNC_PASSWORD_FILE):
513 raise errors.OpPrereqError("Please prepare the cluster VNC"
515 constants.VNC_PASSWORD_FILE)
517 self.hostname = hostname = utils.HostInfo()
519 if hostname.ip.startswith("127."):
520 raise errors.OpPrereqError("This host's IP resolves to the private"
521 " range (%s). Please fix DNS or %s." %
522 (hostname.ip, constants.ETC_HOSTS))
524 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525 source=constants.LOCALHOST_IP_ADDRESS):
526 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527 " to %s,\nbut this ip address does not"
528 " belong to this host."
529 " Aborting." % hostname.ip)
531 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
533 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
535 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
537 secondary_ip = getattr(self.op, "secondary_ip", None)
538 if secondary_ip and not utils.IsValidIP(secondary_ip):
539 raise errors.OpPrereqError("Invalid secondary ip given")
541 secondary_ip != hostname.ip and
542 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543 source=constants.LOCALHOST_IP_ADDRESS))):
544 raise errors.OpPrereqError("You gave %s as secondary IP,"
545 " but it does not belong to this host." %
547 self.secondary_ip = secondary_ip
549 # checks presence of the volume group given
550 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553 raise errors.OpPrereqError("Error: %s" % vgstatus)
555 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
557 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560 if self.op.hypervisor_type not in constants.HYPER_TYPES:
561 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562 self.op.hypervisor_type)
564 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
566 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567 (self.op.master_netdev,
568 result.output.strip()))
570 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572 raise errors.OpPrereqError("Init.d script '%s' missing or not"
573 " executable." % constants.NODE_INITD_SCRIPT)
575 def Exec(self, feedback_fn):
576 """Initialize the cluster.
579 clustername = self.clustername
580 hostname = self.hostname
582 # set up the simple store
583 self.sstore = ss = ssconf.SimpleStore()
584 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
590 # set up the inter-node password and certificate
591 _InitGanetiServerSetup(ss)
593 # start the master ip
594 rpc.call_node_start_master(hostname.name)
596 # set up ssh config and /etc/hosts
597 f = open(constants.SSH_HOST_RSA_PUB, 'r')
602 sshkey = sshline.split(" ")[1]
604 _AddHostToEtcHosts(hostname.name)
606 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
608 _InitSSHSetup(hostname.name)
610 # init of cluster config file
611 self.cfg = cfgw = config.ConfigWriter()
612 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613 sshkey, self.op.mac_prefix,
614 self.op.vg_name, self.op.def_bridge)
617 class LUDestroyCluster(NoHooksLU):
618 """Logical unit for destroying the cluster.
623 def CheckPrereq(self):
624 """Check prerequisites.
626 This checks whether the cluster is empty.
628 Any errors are signalled by raising errors.OpPrereqError.
631 master = self.sstore.GetMasterNode()
633 nodelist = self.cfg.GetNodeList()
634 if len(nodelist) != 1 or nodelist[0] != master:
635 raise errors.OpPrereqError("There are still %d node(s) in"
636 " this cluster." % (len(nodelist) - 1))
637 instancelist = self.cfg.GetInstanceList()
639 raise errors.OpPrereqError("There are still %d instance(s) in"
640 " this cluster." % len(instancelist))
642 def Exec(self, feedback_fn):
643 """Destroys the cluster.
646 master = self.sstore.GetMasterNode()
647 if not rpc.call_node_stop_master(master):
648 raise errors.OpExecError("Could not disable the master role")
649 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650 utils.CreateBackup(priv_key)
651 utils.CreateBackup(pub_key)
652 rpc.call_node_leave_cluster(master)
655 class LUVerifyCluster(LogicalUnit):
656 """Verifies the cluster status.
659 HPATH = "cluster-verify"
660 HTYPE = constants.HTYPE_CLUSTER
661 _OP_REQP = ["skip_checks"]
663 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664 remote_version, feedback_fn):
665 """Run multiple tests against a node.
668 - compares ganeti version
669 - checks vg existance and size > 20G
670 - checks config file checksum
671 - checks ssh to other nodes
674 node: name of the node to check
675 file_list: required list of files
676 local_cksum: dictionary of local files and their checksums
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 if not remote_version:
682 feedback_fn(" - ERROR: connection to %s failed" % (node))
685 if local_version != remote_version:
686 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
687 (local_version, node, remote_version))
690 # checks vg existance and size > 20G
694 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
698 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
700 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
703 # checks config file checksum
706 if 'filelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned file checksum data")
710 remote_cksum = node_result['filelist']
711 for file_name in file_list:
712 if file_name not in remote_cksum:
714 feedback_fn(" - ERROR: file '%s' missing" % file_name)
715 elif remote_cksum[file_name] != local_cksum[file_name]:
717 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
719 if 'nodelist' not in node_result:
721 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
723 if node_result['nodelist']:
725 for node in node_result['nodelist']:
726 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
727 (node, node_result['nodelist'][node]))
728 if 'node-net-test' not in node_result:
730 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
732 if node_result['node-net-test']:
734 nlist = utils.NiceSort(node_result['node-net-test'].keys())
736 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
737 (node, node_result['node-net-test'][node]))
739 hyp_result = node_result.get('hypervisor', None)
740 if hyp_result is not None:
741 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
744 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
745 node_instance, feedback_fn):
746 """Verify an instance.
748 This function checks to see if the required block devices are
749 available on the instance's node.
754 node_current = instanceconfig.primary_node
757 instanceconfig.MapLVsByNode(node_vol_should)
759 for node in node_vol_should:
760 for volume in node_vol_should[node]:
761 if node not in node_vol_is or volume not in node_vol_is[node]:
762 feedback_fn(" - ERROR: volume %s missing on node %s" %
766 if not instanceconfig.status == 'down':
767 if (node_current not in node_instance or
768 not instance in node_instance[node_current]):
769 feedback_fn(" - ERROR: instance %s not running on node %s" %
770 (instance, node_current))
773 for node in node_instance:
774 if (not node == node_current):
775 if instance in node_instance[node]:
776 feedback_fn(" - ERROR: instance %s should not run on node %s" %
782 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
783 """Verify if there are any unknown volumes in the cluster.
785 The .os, .swap and backup volumes are ignored. All other volumes are
791 for node in node_vol_is:
792 for volume in node_vol_is[node]:
793 if node not in node_vol_should or volume not in node_vol_should[node]:
794 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
799 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
800 """Verify the list of running instances.
802 This checks what instances are running but unknown to the cluster.
806 for node in node_instance:
807 for runninginstance in node_instance[node]:
808 if runninginstance not in instancelist:
809 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
810 (runninginstance, node))
814 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
815 """Verify N+1 Memory Resilience.
817 Check that if one single node dies we can still start all the instances it
823 for node, nodeinfo in node_info.iteritems():
824 # This code checks that every node which is now listed as secondary has
825 # enough memory to host all instances it is supposed to should a single
826 # other node in the cluster fail.
827 # FIXME: not ready for failover to an arbitrary node
828 # FIXME: does not support file-backed instances
829 # WARNING: we currently take into account down instances as well as up
830 # ones, considering that even if they're down someone might want to start
831 # them even in the event of a node failure.
832 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
834 for instance in instances:
835 if instance_cfg[instance].auto_balance:
836 needed_mem += instance_cfg[instance].memory
837 if nodeinfo['mfree'] < needed_mem:
838 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
839 " failovers should node %s fail" % (node, prinode))
843 def CheckPrereq(self):
844 """Check prerequisites.
846 Transform the list of checks we're going to skip into a set and check that
847 all its members are valid.
850 self.skip_set = frozenset(self.op.skip_checks)
851 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
852 raise errors.OpPrereqError("Invalid checks to be skipped specified")
854 def BuildHooksEnv(self):
857 Cluster-Verify hooks just rone in the post phase and their failure makes
858 the output be logged in the verify output and the verification to fail.
861 all_nodes = self.cfg.GetNodeList()
862 tags = self.cfg.GetClusterInfo().GetTags()
863 # TODO: populate the environment with useful information for verify hooks
865 "CLUSTER_TAGS": " ".join(tags),
867 return env, [], all_nodes
869 def Exec(self, feedback_fn):
870 """Verify integrity of cluster, performing various test on nodes.
874 feedback_fn("* Verifying global settings")
875 for msg in self.cfg.VerifyConfig():
876 feedback_fn(" - ERROR: %s" % msg)
878 vg_name = self.cfg.GetVGName()
879 nodelist = utils.NiceSort(self.cfg.GetNodeList())
880 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
881 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
882 i_non_redundant = [] # Non redundant instances
883 i_non_a_balanced = [] # Non auto-balanced instances
889 # FIXME: verify OS list
891 file_names = list(self.sstore.GetFileList())
892 file_names.append(constants.SSL_CERT_FILE)
893 file_names.append(constants.CLUSTER_CONF_FILE)
894 local_checksums = utils.FingerprintFiles(file_names)
896 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
897 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
898 all_instanceinfo = rpc.call_instance_list(nodelist)
899 all_vglist = rpc.call_vg_list(nodelist)
900 node_verify_param = {
901 'filelist': file_names,
902 'nodelist': nodelist,
904 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
905 for node in nodeinfo]
907 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
908 all_rversion = rpc.call_version(nodelist)
909 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
911 incomplete_nodeinfo = False
913 for node in nodelist:
914 feedback_fn("* Verifying node %s" % node)
915 result = self._VerifyNode(node, file_names, local_checksums,
916 all_vglist[node], all_nvinfo[node],
917 all_rversion[node], feedback_fn)
921 volumeinfo = all_volumeinfo[node]
923 if isinstance(volumeinfo, basestring):
924 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
925 (node, volumeinfo[-400:].encode('string_escape')))
927 node_volume[node] = {}
928 elif not isinstance(volumeinfo, dict):
929 feedback_fn(" - ERROR: connection to %s failed" % (node,))
931 incomplete_nodeinfo = True
934 node_volume[node] = volumeinfo
937 nodeinstance = all_instanceinfo[node]
938 if type(nodeinstance) != list:
939 feedback_fn(" - ERROR: connection to %s failed" % (node,))
941 incomplete_nodeinfo = True
944 node_instance[node] = nodeinstance
947 nodeinfo = all_ninfo[node]
948 if not isinstance(nodeinfo, dict):
949 feedback_fn(" - ERROR: connection to %s failed" % (node,))
951 incomplete_nodeinfo = True
956 "mfree": int(nodeinfo['memory_free']),
957 "dfree": int(nodeinfo['vg_free']),
960 # dictionary holding all instances this node is secondary for,
961 # grouped by their primary node. Each key is a cluster node, and each
962 # value is a list of instances which have the key as primary and the
963 # current node as secondary. this is handy to calculate N+1 memory
964 # availability if you can only failover from a primary to its
966 "sinst-by-pnode": {},
968 except (ValueError, TypeError):
969 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
971 incomplete_nodeinfo = True
976 for instance in instancelist:
977 feedback_fn("* Verifying instance %s" % instance)
978 inst_config = self.cfg.GetInstanceInfo(instance)
979 result = self._VerifyInstance(instance, inst_config, node_volume,
980 node_instance, feedback_fn)
983 inst_config.MapLVsByNode(node_vol_should)
985 instance_cfg[instance] = inst_config
987 pnode = inst_config.primary_node
988 if pnode in node_info:
989 node_info[pnode]['pinst'].append(instance)
991 feedback_fn(" - ERROR: instance %s, connection to primary node"
992 " %s failed" % (instance, pnode))
995 # If the instance is non-redundant we cannot survive losing its primary
996 # node, so we are not N+1 compliant. On the other hand we have no disk
997 # templates with more than one secondary so that situation is not well
999 # FIXME: does not support file-backed instances
1000 if len(inst_config.secondary_nodes) == 0:
1001 i_non_redundant.append(instance)
1002 elif len(inst_config.secondary_nodes) > 1:
1003 feedback_fn(" - WARNING: multiple secondaries for instance %s"
1006 if not inst_config.auto_balance:
1007 i_non_a_balanced.append(instance)
1009 for snode in inst_config.secondary_nodes:
1010 if snode in node_info:
1011 node_info[snode]['sinst'].append(instance)
1012 if pnode not in node_info[snode]['sinst-by-pnode']:
1013 node_info[snode]['sinst-by-pnode'][pnode] = []
1014 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1016 feedback_fn(" - ERROR: instance %s, connection to secondary node"
1017 " %s failed" % (instance, snode))
1019 feedback_fn("* Verifying orphan volumes")
1020 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1024 feedback_fn("* Verifying remaining instances")
1025 result = self._VerifyOrphanInstances(instancelist, node_instance,
1029 if (constants.VERIFY_NPLUSONE_MEM not in self.skip_set and
1030 not incomplete_nodeinfo):
1031 feedback_fn("* Verifying N+1 Memory redundancy")
1032 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1035 feedback_fn("* Other Notes")
1037 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1038 % len(i_non_redundant))
1040 if i_non_a_balanced:
1041 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1042 % len(i_non_a_balanced))
1046 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1047 """Analize the post-hooks' result, handle it, and send some
1048 nicely-formatted feedback back to the user.
1051 phase: the hooks phase that has just been run
1052 hooks_results: the results of the multi-node hooks rpc call
1053 feedback_fn: function to send feedback back to the caller
1054 lu_result: previous Exec result
1057 # We only really run POST phase hooks, and are only interested in their results
1058 if phase == constants.HOOKS_PHASE_POST:
1059 # Used to change hooks' output to proper indentation
1060 indent_re = re.compile('^', re.M)
1061 feedback_fn("* Hooks Results")
1062 if not hooks_results:
1063 feedback_fn(" - ERROR: general communication failure")
1066 for node_name in hooks_results:
1067 show_node_header = True
1068 res = hooks_results[node_name]
1069 if res is False or not isinstance(res, list):
1070 feedback_fn(" Communication failure")
1073 for script, hkr, output in res:
1074 if hkr == constants.HKR_FAIL:
1075 # The node header is only shown once, if there are
1076 # failing hooks on that node
1077 if show_node_header:
1078 feedback_fn(" Node %s:" % node_name)
1079 show_node_header = False
1080 feedback_fn(" ERROR: Script %s failed, output:" % script)
1081 output = indent_re.sub(' ', output)
1082 feedback_fn("%s" % output)
1088 class LUVerifyDisks(NoHooksLU):
1089 """Verifies the cluster disks status.
1094 def CheckPrereq(self):
1095 """Check prerequisites.
1097 This has no prerequisites.
1102 def Exec(self, feedback_fn):
1103 """Verify integrity of cluster disks.
1106 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1108 vg_name = self.cfg.GetVGName()
1109 nodes = utils.NiceSort(self.cfg.GetNodeList())
1110 instances = [self.cfg.GetInstanceInfo(name)
1111 for name in self.cfg.GetInstanceList()]
1114 for inst in instances:
1116 if (inst.status != "up" or
1117 inst.disk_template not in constants.DTS_NET_MIRROR):
1119 inst.MapLVsByNode(inst_lvs)
1120 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1121 for node, vol_list in inst_lvs.iteritems():
1122 for vol in vol_list:
1123 nv_dict[(node, vol)] = inst
1128 node_lvs = rpc.call_volume_list(nodes, vg_name)
1133 lvs = node_lvs[node]
1135 if isinstance(lvs, basestring):
1136 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1137 res_nlvm[node] = lvs
1138 elif not isinstance(lvs, dict):
1139 logger.Info("connection to node %s failed or invalid data returned" %
1141 res_nodes.append(node)
1144 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1145 inst = nv_dict.pop((node, lv_name), None)
1146 if (not lv_online and inst is not None
1147 and inst.name not in res_instances):
1148 res_instances.append(inst.name)
1150 # any leftover items in nv_dict are missing LVs, let's arrange the
1152 for key, inst in nv_dict.iteritems():
1153 if inst.name not in res_missing:
1154 res_missing[inst.name] = []
1155 res_missing[inst.name].append(key)
1160 class LURenameCluster(LogicalUnit):
1161 """Rename the cluster.
1164 HPATH = "cluster-rename"
1165 HTYPE = constants.HTYPE_CLUSTER
1168 def BuildHooksEnv(self):
1173 "OP_TARGET": self.sstore.GetClusterName(),
1174 "NEW_NAME": self.op.name,
1176 mn = self.sstore.GetMasterNode()
1177 return env, [mn], [mn]
1179 def CheckPrereq(self):
1180 """Verify that the passed name is a valid one.
1183 hostname = utils.HostInfo(self.op.name)
1185 new_name = hostname.name
1186 self.ip = new_ip = hostname.ip
1187 old_name = self.sstore.GetClusterName()
1188 old_ip = self.sstore.GetMasterIP()
1189 if new_name == old_name and new_ip == old_ip:
1190 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1191 " cluster has changed")
1192 if new_ip != old_ip:
1193 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1194 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1195 " reachable on the network. Aborting." %
1198 self.op.name = new_name
1200 def Exec(self, feedback_fn):
1201 """Rename the cluster.
1204 clustername = self.op.name
1208 # shutdown the master IP
1209 master = ss.GetMasterNode()
1210 if not rpc.call_node_stop_master(master):
1211 raise errors.OpExecError("Could not disable the master role")
1215 ss.SetKey(ss.SS_MASTER_IP, ip)
1216 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1218 # Distribute updated ss config to all nodes
1219 myself = self.cfg.GetNodeInfo(master)
1220 dist_nodes = self.cfg.GetNodeList()
1221 if myself.name in dist_nodes:
1222 dist_nodes.remove(myself.name)
1224 logger.Debug("Copying updated ssconf data to all nodes")
1225 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1226 fname = ss.KeyToFilename(keyname)
1227 result = rpc.call_upload_file(dist_nodes, fname)
1228 for to_node in dist_nodes:
1229 if not result[to_node]:
1230 logger.Error("copy of file %s to node %s failed" %
1233 if not rpc.call_node_start_master(master):
1234 logger.Error("Could not re-enable the master role on the master,"
1235 " please restart manually.")
1238 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1239 """Sleep and poll for an instance's disk to sync.
1242 if not instance.disks:
1246 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1248 node = instance.primary_node
1250 for dev in instance.disks:
1251 cfgw.SetDiskID(dev, node)
1257 cumul_degraded = False
1258 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1260 proc.LogWarning("Can't get any data from node %s" % node)
1263 raise errors.RemoteError("Can't contact node %s for mirror data,"
1264 " aborting." % node)
1268 for i in range(len(rstats)):
1271 proc.LogWarning("Can't compute data for node %s/%s" %
1272 (node, instance.disks[i].iv_name))
1274 # we ignore the ldisk parameter
1275 perc_done, est_time, is_degraded, _ = mstat
1276 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1277 if perc_done is not None:
1279 if est_time is not None:
1280 rem_time = "%d estimated seconds remaining" % est_time
1283 rem_time = "no time estimate"
1284 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1285 (instance.disks[i].iv_name, perc_done, rem_time))
1292 time.sleep(min(60, max_time))
1298 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1299 return not cumul_degraded
1302 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1303 """Check that mirrors are not degraded.
1305 The ldisk parameter, if True, will change the test from the
1306 is_degraded attribute (which represents overall non-ok status for
1307 the device(s)) to the ldisk (representing the local storage status).
1310 cfgw.SetDiskID(dev, node)
1317 if on_primary or dev.AssembleOnSecondary():
1318 rstats = rpc.call_blockdev_find(node, dev)
1320 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1323 result = result and (not rstats[idx])
1325 for child in dev.children:
1326 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1331 class LUDiagnoseOS(NoHooksLU):
1332 """Logical unit for OS diagnose/query.
1335 _OP_REQP = ["output_fields", "names"]
1337 def CheckPrereq(self):
1338 """Check prerequisites.
1340 This always succeeds, since this is a pure query LU.
1344 raise errors.OpPrereqError("Selective OS query not supported")
1346 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1347 _CheckOutputFields(static=[],
1348 dynamic=self.dynamic_fields,
1349 selected=self.op.output_fields)
1352 def _DiagnoseByOS(node_list, rlist):
1353 """Remaps a per-node return list into an a per-os per-node dictionary
1356 node_list: a list with the names of all nodes
1357 rlist: a map with node names as keys and OS objects as values
1360 map: a map with osnames as keys and as value another map, with
1362 keys and list of OS objects as values
1363 e.g. {"debian-etch": {"node1": [<object>,...],
1364 "node2": [<object>,]}
1369 for node_name, nr in rlist.iteritems():
1373 if os.name not in all_os:
1374 # build a list of nodes for this os containing empty lists
1375 # for each node in node_list
1376 all_os[os.name] = {}
1377 for nname in node_list:
1378 all_os[os.name][nname] = []
1379 all_os[os.name][node_name].append(os)
1382 def Exec(self, feedback_fn):
1383 """Compute the list of OSes.
1386 node_list = self.cfg.GetNodeList()
1387 node_data = rpc.call_os_diagnose(node_list)
1388 if node_data == False:
1389 raise errors.OpExecError("Can't gather the list of OSes")
1390 pol = self._DiagnoseByOS(node_list, node_data)
1392 for os_name, os_data in pol.iteritems():
1394 for field in self.op.output_fields:
1397 elif field == "valid":
1398 val = utils.all([osl and osl[0] for osl in os_data.values()])
1399 elif field == "node_status":
1401 for node_name, nos_list in os_data.iteritems():
1402 val[node_name] = [(v.status, v.path) for v in nos_list]
1404 raise errors.ParameterError(field)
1411 class LURemoveNode(LogicalUnit):
1412 """Logical unit for removing a node.
1415 HPATH = "node-remove"
1416 HTYPE = constants.HTYPE_NODE
1417 _OP_REQP = ["node_name"]
1419 def BuildHooksEnv(self):
1422 This doesn't run on the target node in the pre phase as a failed
1423 node would not allows itself to run.
1427 "OP_TARGET": self.op.node_name,
1428 "NODE_NAME": self.op.node_name,
1430 all_nodes = self.cfg.GetNodeList()
1431 all_nodes.remove(self.op.node_name)
1432 return env, all_nodes, all_nodes
1434 def CheckPrereq(self):
1435 """Check prerequisites.
1438 - the node exists in the configuration
1439 - it does not have primary or secondary instances
1440 - it's not the master
1442 Any errors are signalled by raising errors.OpPrereqError.
1445 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1447 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1449 instance_list = self.cfg.GetInstanceList()
1451 masternode = self.sstore.GetMasterNode()
1452 if node.name == masternode:
1453 raise errors.OpPrereqError("Node is the master node,"
1454 " you need to failover first.")
1456 for instance_name in instance_list:
1457 instance = self.cfg.GetInstanceInfo(instance_name)
1458 if node.name == instance.primary_node:
1459 raise errors.OpPrereqError("Instance %s still running on the node,"
1460 " please remove first." % instance_name)
1461 if node.name in instance.secondary_nodes:
1462 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1463 " please remove first." % instance_name)
1464 self.op.node_name = node.name
1467 def Exec(self, feedback_fn):
1468 """Removes the node from the cluster.
1472 logger.Info("stopping the node daemon and removing configs from node %s" %
1475 rpc.call_node_leave_cluster(node.name)
1477 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1479 logger.Info("Removing node %s from config" % node.name)
1481 self.cfg.RemoveNode(node.name)
1483 _RemoveHostFromEtcHosts(node.name)
1486 class LUQueryNodes(NoHooksLU):
1487 """Logical unit for querying nodes.
1490 _OP_REQP = ["output_fields", "names"]
1492 def CheckPrereq(self):
1493 """Check prerequisites.
1495 This checks that the fields required are valid output fields.
1498 self.dynamic_fields = frozenset([
1500 "mtotal", "mnode", "mfree",
1502 "ctotal", "cnodes", "csockets",
1505 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1506 "pinst_list", "sinst_list",
1507 "pip", "sip", "tags"],
1508 dynamic=self.dynamic_fields,
1509 selected=self.op.output_fields)
1511 self.wanted = _GetWantedNodes(self, self.op.names)
1513 def Exec(self, feedback_fn):
1514 """Computes the list of nodes and their attributes.
1517 nodenames = self.wanted
1518 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1520 # begin data gathering
1522 if self.dynamic_fields.intersection(self.op.output_fields):
1524 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1525 for name in nodenames:
1526 nodeinfo = node_data.get(name, None)
1528 fn = utils.TryConvert
1530 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1531 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1532 "mfree": fn(int, nodeinfo.get('memory_free', None)),
1533 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1534 "dfree": fn(int, nodeinfo.get('vg_free', None)),
1535 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1536 "bootid": nodeinfo.get('bootid', None),
1537 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1538 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1541 live_data[name] = {}
1543 live_data = dict.fromkeys(nodenames, {})
1545 node_to_primary = dict([(name, set()) for name in nodenames])
1546 node_to_secondary = dict([(name, set()) for name in nodenames])
1548 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1549 "sinst_cnt", "sinst_list"))
1550 if inst_fields & frozenset(self.op.output_fields):
1551 instancelist = self.cfg.GetInstanceList()
1553 for instance_name in instancelist:
1554 inst = self.cfg.GetInstanceInfo(instance_name)
1555 if inst.primary_node in node_to_primary:
1556 node_to_primary[inst.primary_node].add(inst.name)
1557 for secnode in inst.secondary_nodes:
1558 if secnode in node_to_secondary:
1559 node_to_secondary[secnode].add(inst.name)
1561 # end data gathering
1564 for node in nodelist:
1566 for field in self.op.output_fields:
1569 elif field == "pinst_list":
1570 val = list(node_to_primary[node.name])
1571 elif field == "sinst_list":
1572 val = list(node_to_secondary[node.name])
1573 elif field == "pinst_cnt":
1574 val = len(node_to_primary[node.name])
1575 elif field == "sinst_cnt":
1576 val = len(node_to_secondary[node.name])
1577 elif field == "pip":
1578 val = node.primary_ip
1579 elif field == "sip":
1580 val = node.secondary_ip
1581 elif field == "tags":
1582 val = list(node.GetTags())
1583 elif field in self.dynamic_fields:
1584 val = live_data[node.name].get(field, None)
1586 raise errors.ParameterError(field)
1587 node_output.append(val)
1588 output.append(node_output)
1593 class LUQueryNodeVolumes(NoHooksLU):
1594 """Logical unit for getting volumes on node(s).
1597 _OP_REQP = ["nodes", "output_fields"]
1599 def CheckPrereq(self):
1600 """Check prerequisites.
1602 This checks that the fields required are valid output fields.
1605 self.nodes = _GetWantedNodes(self, self.op.nodes)
1607 _CheckOutputFields(static=["node"],
1608 dynamic=["phys", "vg", "name", "size", "instance"],
1609 selected=self.op.output_fields)
1612 def Exec(self, feedback_fn):
1613 """Computes the list of nodes and their attributes.
1616 nodenames = self.nodes
1617 volumes = rpc.call_node_volumes(nodenames)
1619 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1620 in self.cfg.GetInstanceList()]
1622 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1625 for node in nodenames:
1626 if node not in volumes or not volumes[node]:
1629 node_vols = volumes[node][:]
1630 node_vols.sort(key=lambda vol: vol['dev'])
1632 for vol in node_vols:
1634 for field in self.op.output_fields:
1637 elif field == "phys":
1641 elif field == "name":
1643 elif field == "size":
1644 val = int(float(vol['size']))
1645 elif field == "instance":
1647 if node not in lv_by_node[inst]:
1649 if vol['name'] in lv_by_node[inst][node]:
1655 raise errors.ParameterError(field)
1656 node_output.append(str(val))
1658 output.append(node_output)
1663 class LUAddNode(LogicalUnit):
1664 """Logical unit for adding node to the cluster.
1668 HTYPE = constants.HTYPE_NODE
1669 _OP_REQP = ["node_name"]
1671 def BuildHooksEnv(self):
1674 This will run on all nodes before, and on all nodes + the new node after.
1678 "OP_TARGET": self.op.node_name,
1679 "NODE_NAME": self.op.node_name,
1680 "NODE_PIP": self.op.primary_ip,
1681 "NODE_SIP": self.op.secondary_ip,
1683 nodes_0 = self.cfg.GetNodeList()
1684 nodes_1 = nodes_0 + [self.op.node_name, ]
1685 return env, nodes_0, nodes_1
1687 def CheckPrereq(self):
1688 """Check prerequisites.
1691 - the new node is not already in the config
1693 - its parameters (single/dual homed) matches the cluster
1695 Any errors are signalled by raising errors.OpPrereqError.
1698 node_name = self.op.node_name
1701 dns_data = utils.HostInfo(node_name)
1703 node = dns_data.name
1704 primary_ip = self.op.primary_ip = dns_data.ip
1705 secondary_ip = getattr(self.op, "secondary_ip", None)
1706 if secondary_ip is None:
1707 secondary_ip = primary_ip
1708 if not utils.IsValidIP(secondary_ip):
1709 raise errors.OpPrereqError("Invalid secondary IP given")
1710 self.op.secondary_ip = secondary_ip
1712 node_list = cfg.GetNodeList()
1713 if not self.op.readd and node in node_list:
1714 raise errors.OpPrereqError("Node %s is already in the configuration" %
1716 elif self.op.readd and node not in node_list:
1717 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1719 for existing_node_name in node_list:
1720 existing_node = cfg.GetNodeInfo(existing_node_name)
1722 if self.op.readd and node == existing_node_name:
1723 if (existing_node.primary_ip != primary_ip or
1724 existing_node.secondary_ip != secondary_ip):
1725 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1726 " address configuration as before")
1729 if (existing_node.primary_ip == primary_ip or
1730 existing_node.secondary_ip == primary_ip or
1731 existing_node.primary_ip == secondary_ip or
1732 existing_node.secondary_ip == secondary_ip):
1733 raise errors.OpPrereqError("New node ip address(es) conflict with"
1734 " existing node %s" % existing_node.name)
1736 # check that the type of the node (single versus dual homed) is the
1737 # same as for the master
1738 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1739 master_singlehomed = myself.secondary_ip == myself.primary_ip
1740 newbie_singlehomed = secondary_ip == primary_ip
1741 if master_singlehomed != newbie_singlehomed:
1742 if master_singlehomed:
1743 raise errors.OpPrereqError("The master has no private ip but the"
1744 " new node has one")
1746 raise errors.OpPrereqError("The master has a private ip but the"
1747 " new node doesn't have one")
1749 # checks reachablity
1750 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1751 raise errors.OpPrereqError("Node not reachable by ping")
1753 if not newbie_singlehomed:
1754 # check reachability from my secondary ip to newbie's secondary ip
1755 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1756 source=myself.secondary_ip):
1757 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1758 " based ping to noded port")
1760 self.new_node = objects.Node(name=node,
1761 primary_ip=primary_ip,
1762 secondary_ip=secondary_ip)
1764 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1765 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1766 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1767 constants.VNC_PASSWORD_FILE)
1769 def Exec(self, feedback_fn):
1770 """Adds the new node to the cluster.
1773 new_node = self.new_node
1774 node = new_node.name
1776 # set up inter-node password and certificate and restarts the node daemon
1777 gntpass = self.sstore.GetNodeDaemonPassword()
1778 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1779 raise errors.OpExecError("ganeti password corruption detected")
1780 f = open(constants.SSL_CERT_FILE)
1782 gntpem = f.read(8192)
1785 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1786 # so we use this to detect an invalid certificate; as long as the
1787 # cert doesn't contain this, the here-document will be correctly
1788 # parsed by the shell sequence below
1789 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1790 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1791 if not gntpem.endswith("\n"):
1792 raise errors.OpExecError("PEM must end with newline")
1793 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1795 # and then connect with ssh to set password and start ganeti-noded
1796 # note that all the below variables are sanitized at this point,
1797 # either by being constants or by the checks above
1799 mycommand = ("umask 077 && "
1800 "echo '%s' > '%s' && "
1801 "cat > '%s' << '!EOF.' && \n"
1802 "%s!EOF.\n%s restart" %
1803 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1804 constants.SSL_CERT_FILE, gntpem,
1805 constants.NODE_INITD_SCRIPT))
1807 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1809 raise errors.OpExecError("Remote command on node %s, error: %s,"
1811 (node, result.fail_reason, result.output))
1813 # check connectivity
1816 result = rpc.call_version([node])[node]
1818 if constants.PROTOCOL_VERSION == result:
1819 logger.Info("communication to node %s fine, sw version %s match" %
1822 raise errors.OpExecError("Version mismatch master version %s,"
1823 " node version %s" %
1824 (constants.PROTOCOL_VERSION, result))
1826 raise errors.OpExecError("Cannot get version from the new node")
1829 logger.Info("copy ssh key to node %s" % node)
1830 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1832 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1833 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1839 keyarray.append(f.read())
1843 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1844 keyarray[3], keyarray[4], keyarray[5])
1847 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1849 # Add node to our /etc/hosts, and add key to known_hosts
1850 _AddHostToEtcHosts(new_node.name)
1852 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1853 self.cfg.GetHostKey())
1855 if new_node.secondary_ip != new_node.primary_ip:
1856 if not rpc.call_node_tcp_ping(new_node.name,
1857 constants.LOCALHOST_IP_ADDRESS,
1858 new_node.secondary_ip,
1859 constants.DEFAULT_NODED_PORT,
1861 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1862 " you gave (%s). Please fix and re-run this"
1863 " command." % new_node.secondary_ip)
1865 success, msg = ssh.VerifyNodeHostname(node)
1867 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1868 " than the one the resolver gives: %s."
1869 " Please fix and re-run this command." %
1872 # Distribute updated /etc/hosts and known_hosts to all nodes,
1873 # including the node just added
1874 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1875 dist_nodes = self.cfg.GetNodeList()
1876 if not self.op.readd:
1877 dist_nodes.append(node)
1878 if myself.name in dist_nodes:
1879 dist_nodes.remove(myself.name)
1881 logger.Debug("Copying hosts and known_hosts to all nodes")
1882 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1883 result = rpc.call_upload_file(dist_nodes, fname)
1884 for to_node in dist_nodes:
1885 if not result[to_node]:
1886 logger.Error("copy of file %s to node %s failed" %
1889 to_copy = ss.GetFileList()
1890 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1891 to_copy.append(constants.VNC_PASSWORD_FILE)
1892 for fname in to_copy:
1893 if not ssh.CopyFileToNode(node, fname):
1894 logger.Error("could not copy file %s to node %s" % (fname, node))
1896 if not self.op.readd:
1897 logger.Info("adding node %s to cluster.conf" % node)
1898 self.cfg.AddNode(new_node)
1901 class LUMasterFailover(LogicalUnit):
1902 """Failover the master node to the current node.
1904 This is a special LU in that it must run on a non-master node.
1907 HPATH = "master-failover"
1908 HTYPE = constants.HTYPE_CLUSTER
1912 def BuildHooksEnv(self):
1915 This will run on the new master only in the pre phase, and on all
1916 the nodes in the post phase.
1920 "OP_TARGET": self.new_master,
1921 "NEW_MASTER": self.new_master,
1922 "OLD_MASTER": self.old_master,
1924 return env, [self.new_master], self.cfg.GetNodeList()
1926 def CheckPrereq(self):
1927 """Check prerequisites.
1929 This checks that we are not already the master.
1932 self.new_master = utils.HostInfo().name
1933 self.old_master = self.sstore.GetMasterNode()
1935 if self.old_master == self.new_master:
1936 raise errors.OpPrereqError("This commands must be run on the node"
1937 " where you want the new master to be."
1938 " %s is already the master" %
1941 def Exec(self, feedback_fn):
1942 """Failover the master node.
1944 This command, when run on a non-master node, will cause the current
1945 master to cease being master, and the non-master to become new
1949 #TODO: do not rely on gethostname returning the FQDN
1950 logger.Info("setting master to %s, old master: %s" %
1951 (self.new_master, self.old_master))
1953 if not rpc.call_node_stop_master(self.old_master):
1954 logger.Error("could disable the master role on the old master"
1955 " %s, please disable manually" % self.old_master)
1958 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1959 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1960 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1961 logger.Error("could not distribute the new simple store master file"
1962 " to the other nodes, please check.")
1964 if not rpc.call_node_start_master(self.new_master):
1965 logger.Error("could not start the master role on the new master"
1966 " %s, please check" % self.new_master)
1967 feedback_fn("Error in activating the master IP on the new master,"
1968 " please fix manually.")
1972 class LUQueryClusterInfo(NoHooksLU):
1973 """Query cluster configuration.
1979 def CheckPrereq(self):
1980 """No prerequsites needed for this LU.
1985 def Exec(self, feedback_fn):
1986 """Return cluster config.
1990 "name": self.sstore.GetClusterName(),
1991 "software_version": constants.RELEASE_VERSION,
1992 "protocol_version": constants.PROTOCOL_VERSION,
1993 "config_version": constants.CONFIG_VERSION,
1994 "os_api_version": constants.OS_API_VERSION,
1995 "export_version": constants.EXPORT_VERSION,
1996 "master": self.sstore.GetMasterNode(),
1997 "architecture": (platform.architecture()[0], platform.machine()),
1998 "hypervisor_type": self.sstore.GetHypervisorType(),
2004 class LUClusterCopyFile(NoHooksLU):
2005 """Copy file to cluster.
2008 _OP_REQP = ["nodes", "filename"]
2010 def CheckPrereq(self):
2011 """Check prerequisites.
2013 It should check that the named file exists and that the given list
2017 if not os.path.exists(self.op.filename):
2018 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
2020 self.nodes = _GetWantedNodes(self, self.op.nodes)
2022 def Exec(self, feedback_fn):
2023 """Copy a file from master to some nodes.
2026 opts - class with options as members
2027 args - list containing a single element, the file name
2029 nodes - list containing the name of target nodes; if empty, all nodes
2032 filename = self.op.filename
2034 myname = utils.HostInfo().name
2036 for node in self.nodes:
2039 if not ssh.CopyFileToNode(node, filename):
2040 logger.Error("Copy of file %s to node %s failed" % (filename, node))
2043 class LUDumpClusterConfig(NoHooksLU):
2044 """Return a text-representation of the cluster-config.
2049 def CheckPrereq(self):
2050 """No prerequisites.
2055 def Exec(self, feedback_fn):
2056 """Dump a representation of the cluster config to the standard output.
2059 return self.cfg.DumpConfig()
2062 class LURunClusterCommand(NoHooksLU):
2063 """Run a command on some nodes.
2066 _OP_REQP = ["command", "nodes"]
2068 def CheckPrereq(self):
2069 """Check prerequisites.
2071 It checks that the given list of nodes is valid.
2074 self.nodes = _GetWantedNodes(self, self.op.nodes)
2076 def Exec(self, feedback_fn):
2077 """Run a command on some nodes.
2080 # put the master at the end of the nodes list
2081 master_node = self.sstore.GetMasterNode()
2082 if master_node in self.nodes:
2083 self.nodes.remove(master_node)
2084 self.nodes.append(master_node)
2087 for node in self.nodes:
2088 result = ssh.SSHCall(node, "root", self.op.command)
2089 data.append((node, result.output, result.exit_code))
2094 class LUActivateInstanceDisks(NoHooksLU):
2095 """Bring up an instance's disks.
2098 _OP_REQP = ["instance_name"]
2100 def CheckPrereq(self):
2101 """Check prerequisites.
2103 This checks that the instance is in the cluster.
2106 instance = self.cfg.GetInstanceInfo(
2107 self.cfg.ExpandInstanceName(self.op.instance_name))
2108 if instance is None:
2109 raise errors.OpPrereqError("Instance '%s' not known" %
2110 self.op.instance_name)
2111 self.instance = instance
2114 def Exec(self, feedback_fn):
2115 """Activate the disks.
2118 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2120 raise errors.OpExecError("Cannot activate block devices")
2125 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2126 """Prepare the block devices for an instance.
2128 This sets up the block devices on all nodes.
2131 instance: a ganeti.objects.Instance object
2132 ignore_secondaries: if true, errors on secondary nodes won't result
2133 in an error return from the function
2136 false if the operation failed
2137 list of (host, instance_visible_name, node_visible_name) if the operation
2138 suceeded with the mapping from node devices to instance devices
2142 iname = instance.name
2143 # With the two passes mechanism we try to reduce the window of
2144 # opportunity for the race condition of switching DRBD to primary
2145 # before handshaking occured, but we do not eliminate it
2147 # The proper fix would be to wait (with some limits) until the
2148 # connection has been made and drbd transitions from WFConnection
2149 # into any other network-connected state (Connected, SyncTarget,
2152 # 1st pass, assemble on all nodes in secondary mode
2153 for inst_disk in instance.disks:
2154 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2155 cfg.SetDiskID(node_disk, node)
2156 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2158 logger.Error("could not prepare block device %s on node %s"
2159 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2160 if not ignore_secondaries:
2163 # FIXME: race condition on drbd migration to primary
2165 # 2nd pass, do only the primary node
2166 for inst_disk in instance.disks:
2167 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2168 if node != instance.primary_node:
2170 cfg.SetDiskID(node_disk, node)
2171 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2173 logger.Error("could not prepare block device %s on node %s"
2174 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2176 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2178 # leave the disks configured for the primary node
2179 # this is a workaround that would be fixed better by
2180 # improving the logical/physical id handling
2181 for disk in instance.disks:
2182 cfg.SetDiskID(disk, instance.primary_node)
2184 return disks_ok, device_info
2187 def _StartInstanceDisks(cfg, instance, force):
2188 """Start the disks of an instance.
2191 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2192 ignore_secondaries=force)
2194 _ShutdownInstanceDisks(instance, cfg)
2195 if force is not None and not force:
2196 logger.Error("If the message above refers to a secondary node,"
2197 " you can retry the operation using '--force'.")
2198 raise errors.OpExecError("Disk consistency error")
2201 class LUDeactivateInstanceDisks(NoHooksLU):
2202 """Shutdown an instance's disks.
2205 _OP_REQP = ["instance_name"]
2207 def CheckPrereq(self):
2208 """Check prerequisites.
2210 This checks that the instance is in the cluster.
2213 instance = self.cfg.GetInstanceInfo(
2214 self.cfg.ExpandInstanceName(self.op.instance_name))
2215 if instance is None:
2216 raise errors.OpPrereqError("Instance '%s' not known" %
2217 self.op.instance_name)
2218 self.instance = instance
2220 def Exec(self, feedback_fn):
2221 """Deactivate the disks
2224 instance = self.instance
2225 ins_l = rpc.call_instance_list([instance.primary_node])
2226 ins_l = ins_l[instance.primary_node]
2227 if not type(ins_l) is list:
2228 raise errors.OpExecError("Can't contact node '%s'" %
2229 instance.primary_node)
2231 if self.instance.name in ins_l:
2232 raise errors.OpExecError("Instance is running, can't shutdown"
2235 _ShutdownInstanceDisks(instance, self.cfg)
2238 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2239 """Shutdown block devices of an instance.
2241 This does the shutdown on all nodes of the instance.
2243 If the ignore_primary is false, errors on the primary node are
2248 for disk in instance.disks:
2249 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2250 cfg.SetDiskID(top_disk, node)
2251 if not rpc.call_blockdev_shutdown(node, top_disk):
2252 logger.Error("could not shutdown block device %s on node %s" %
2253 (disk.iv_name, node))
2254 if not ignore_primary or node != instance.primary_node:
2259 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2260 """Checks if a node has enough free memory.
2262 This function check if a given node has the needed amount of free
2263 memory. In case the node has less memory or we cannot get the
2264 information from the node, this function raise an OpPrereqError
2268 - cfg: a ConfigWriter instance
2269 - node: the node name
2270 - reason: string to use in the error message
2271 - requested: the amount of memory in MiB
2274 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2275 if not (nodeinfo and isinstance(nodeinfo, dict) and
2276 node in nodeinfo and isinstance(nodeinfo[node], dict)):
2277 raise errors.OpPrereqError("Could not contact node %s for resource"
2278 " information" % (node,))
2280 free_mem = nodeinfo[node].get('memory_free')
2281 if not isinstance(free_mem, int):
2282 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2283 " was '%s'" % (node, free_mem))
2284 if requested > free_mem:
2285 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2286 " needed %s MiB, available %s MiB" %
2287 (node, reason, requested, free_mem))
2290 class LUStartupInstance(LogicalUnit):
2291 """Starts an instance.
2294 HPATH = "instance-start"
2295 HTYPE = constants.HTYPE_INSTANCE
2296 _OP_REQP = ["instance_name", "force"]
2298 def BuildHooksEnv(self):
2301 This runs on master, primary and secondary nodes of the instance.
2305 "FORCE": self.op.force,
2307 env.update(_BuildInstanceHookEnvByObject(self.instance))
2308 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2309 list(self.instance.secondary_nodes))
2312 def CheckPrereq(self):
2313 """Check prerequisites.
2315 This checks that the instance is in the cluster.
2318 instance = self.cfg.GetInstanceInfo(
2319 self.cfg.ExpandInstanceName(self.op.instance_name))
2320 if instance is None:
2321 raise errors.OpPrereqError("Instance '%s' not known" %
2322 self.op.instance_name)
2324 # check bridges existance
2325 _CheckInstanceBridgesExist(instance)
2327 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2328 "starting instance %s" % instance.name,
2331 self.instance = instance
2332 self.op.instance_name = instance.name
2334 def Exec(self, feedback_fn):
2335 """Start the instance.
2338 instance = self.instance
2339 force = self.op.force
2340 extra_args = getattr(self.op, "extra_args", "")
2342 self.cfg.MarkInstanceUp(instance.name)
2344 node_current = instance.primary_node
2346 _StartInstanceDisks(self.cfg, instance, force)
2348 if not rpc.call_instance_start(node_current, instance, extra_args):
2349 _ShutdownInstanceDisks(instance, self.cfg)
2350 raise errors.OpExecError("Could not start instance")
2353 class LURebootInstance(LogicalUnit):
2354 """Reboot an instance.
2357 HPATH = "instance-reboot"
2358 HTYPE = constants.HTYPE_INSTANCE
2359 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2361 def BuildHooksEnv(self):
2364 This runs on master, primary and secondary nodes of the instance.
2368 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2370 env.update(_BuildInstanceHookEnvByObject(self.instance))
2371 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2372 list(self.instance.secondary_nodes))
2375 def CheckPrereq(self):
2376 """Check prerequisites.
2378 This checks that the instance is in the cluster.
2381 instance = self.cfg.GetInstanceInfo(
2382 self.cfg.ExpandInstanceName(self.op.instance_name))
2383 if instance is None:
2384 raise errors.OpPrereqError("Instance '%s' not known" %
2385 self.op.instance_name)
2387 # check bridges existance
2388 _CheckInstanceBridgesExist(instance)
2390 self.instance = instance
2391 self.op.instance_name = instance.name
2393 def Exec(self, feedback_fn):
2394 """Reboot the instance.
2397 instance = self.instance
2398 ignore_secondaries = self.op.ignore_secondaries
2399 reboot_type = self.op.reboot_type
2400 extra_args = getattr(self.op, "extra_args", "")
2402 node_current = instance.primary_node
2404 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2405 constants.INSTANCE_REBOOT_HARD,
2406 constants.INSTANCE_REBOOT_FULL]:
2407 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2408 (constants.INSTANCE_REBOOT_SOFT,
2409 constants.INSTANCE_REBOOT_HARD,
2410 constants.INSTANCE_REBOOT_FULL))
2412 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2413 constants.INSTANCE_REBOOT_HARD]:
2414 if not rpc.call_instance_reboot(node_current, instance,
2415 reboot_type, extra_args):
2416 raise errors.OpExecError("Could not reboot instance")
2418 if not rpc.call_instance_shutdown(node_current, instance):
2419 raise errors.OpExecError("could not shutdown instance for full reboot")
2420 _ShutdownInstanceDisks(instance, self.cfg)
2421 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2422 if not rpc.call_instance_start(node_current, instance, extra_args):
2423 _ShutdownInstanceDisks(instance, self.cfg)
2424 raise errors.OpExecError("Could not start instance for full reboot")
2426 self.cfg.MarkInstanceUp(instance.name)
2429 class LUShutdownInstance(LogicalUnit):
2430 """Shutdown an instance.
2433 HPATH = "instance-stop"
2434 HTYPE = constants.HTYPE_INSTANCE
2435 _OP_REQP = ["instance_name"]
2437 def BuildHooksEnv(self):
2440 This runs on master, primary and secondary nodes of the instance.
2443 env = _BuildInstanceHookEnvByObject(self.instance)
2444 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2445 list(self.instance.secondary_nodes))
2448 def CheckPrereq(self):
2449 """Check prerequisites.
2451 This checks that the instance is in the cluster.
2454 instance = self.cfg.GetInstanceInfo(
2455 self.cfg.ExpandInstanceName(self.op.instance_name))
2456 if instance is None:
2457 raise errors.OpPrereqError("Instance '%s' not known" %
2458 self.op.instance_name)
2459 self.instance = instance
2461 def Exec(self, feedback_fn):
2462 """Shutdown the instance.
2465 instance = self.instance
2466 node_current = instance.primary_node
2467 self.cfg.MarkInstanceDown(instance.name)
2468 if not rpc.call_instance_shutdown(node_current, instance):
2469 logger.Error("could not shutdown instance")
2471 _ShutdownInstanceDisks(instance, self.cfg)
2474 class LUReinstallInstance(LogicalUnit):
2475 """Reinstall an instance.
2478 HPATH = "instance-reinstall"
2479 HTYPE = constants.HTYPE_INSTANCE
2480 _OP_REQP = ["instance_name"]
2482 def BuildHooksEnv(self):
2485 This runs on master, primary and secondary nodes of the instance.
2488 env = _BuildInstanceHookEnvByObject(self.instance)
2489 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2490 list(self.instance.secondary_nodes))
2493 def CheckPrereq(self):
2494 """Check prerequisites.
2496 This checks that the instance is in the cluster and is not running.
2499 instance = self.cfg.GetInstanceInfo(
2500 self.cfg.ExpandInstanceName(self.op.instance_name))
2501 if instance is None:
2502 raise errors.OpPrereqError("Instance '%s' not known" %
2503 self.op.instance_name)
2504 if instance.disk_template == constants.DT_DISKLESS:
2505 raise errors.OpPrereqError("Instance '%s' has no disks" %
2506 self.op.instance_name)
2507 if instance.status != "down":
2508 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2509 self.op.instance_name)
2510 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2512 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2513 (self.op.instance_name,
2514 instance.primary_node))
2516 self.op.os_type = getattr(self.op, "os_type", None)
2517 if self.op.os_type is not None:
2519 pnode = self.cfg.GetNodeInfo(
2520 self.cfg.ExpandNodeName(instance.primary_node))
2522 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2524 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2526 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2527 " primary node" % self.op.os_type)
2529 self.instance = instance
2531 def Exec(self, feedback_fn):
2532 """Reinstall the instance.
2535 inst = self.instance
2537 if self.op.os_type is not None:
2538 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2539 inst.os = self.op.os_type
2540 self.cfg.AddInstance(inst)
2542 _StartInstanceDisks(self.cfg, inst, None)
2544 feedback_fn("Running the instance OS create scripts...")
2545 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2546 raise errors.OpExecError("Could not install OS for instance %s"
2548 (inst.name, inst.primary_node))
2550 _ShutdownInstanceDisks(inst, self.cfg)
2553 class LURenameInstance(LogicalUnit):
2554 """Rename an instance.
2557 HPATH = "instance-rename"
2558 HTYPE = constants.HTYPE_INSTANCE
2559 _OP_REQP = ["instance_name", "new_name"]
2561 def BuildHooksEnv(self):
2564 This runs on master, primary and secondary nodes of the instance.
2567 env = _BuildInstanceHookEnvByObject(self.instance)
2568 env["INSTANCE_NEW_NAME"] = self.op.new_name
2569 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2570 list(self.instance.secondary_nodes))
2573 def CheckPrereq(self):
2574 """Check prerequisites.
2576 This checks that the instance is in the cluster and is not running.
2579 instance = self.cfg.GetInstanceInfo(
2580 self.cfg.ExpandInstanceName(self.op.instance_name))
2581 if instance is None:
2582 raise errors.OpPrereqError("Instance '%s' not known" %
2583 self.op.instance_name)
2584 if instance.status != "down":
2585 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2586 self.op.instance_name)
2587 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2589 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2590 (self.op.instance_name,
2591 instance.primary_node))
2592 self.instance = instance
2594 # new name verification
2595 name_info = utils.HostInfo(self.op.new_name)
2597 self.op.new_name = new_name = name_info.name
2598 instance_list = self.cfg.GetInstanceList()
2599 if new_name in instance_list:
2600 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2603 if not getattr(self.op, "ignore_ip", False):
2604 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2605 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2606 (name_info.ip, new_name))
2609 def Exec(self, feedback_fn):
2610 """Reinstall the instance.
2613 inst = self.instance
2614 old_name = inst.name
2616 self.cfg.RenameInstance(inst.name, self.op.new_name)
2618 # re-read the instance from the configuration after rename
2619 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2621 _StartInstanceDisks(self.cfg, inst, None)
2623 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2625 msg = ("Could not run OS rename script for instance %s on node %s"
2626 " (but the instance has been renamed in Ganeti)" %
2627 (inst.name, inst.primary_node))
2630 _ShutdownInstanceDisks(inst, self.cfg)
2633 class LURemoveInstance(LogicalUnit):
2634 """Remove an instance.
2637 HPATH = "instance-remove"
2638 HTYPE = constants.HTYPE_INSTANCE
2639 _OP_REQP = ["instance_name", "ignore_failures"]
2641 def BuildHooksEnv(self):
2644 This runs on master, primary and secondary nodes of the instance.
2647 env = _BuildInstanceHookEnvByObject(self.instance)
2648 nl = [self.sstore.GetMasterNode()]
2651 def CheckPrereq(self):
2652 """Check prerequisites.
2654 This checks that the instance is in the cluster.
2657 instance = self.cfg.GetInstanceInfo(
2658 self.cfg.ExpandInstanceName(self.op.instance_name))
2659 if instance is None:
2660 raise errors.OpPrereqError("Instance '%s' not known" %
2661 self.op.instance_name)
2662 self.instance = instance
2664 def Exec(self, feedback_fn):
2665 """Remove the instance.
2668 instance = self.instance
2669 logger.Info("shutting down instance %s on node %s" %
2670 (instance.name, instance.primary_node))
2672 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2673 if self.op.ignore_failures:
2674 feedback_fn("Warning: can't shutdown instance")
2676 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2677 (instance.name, instance.primary_node))
2679 logger.Info("removing block devices for instance %s" % instance.name)
2681 if not _RemoveDisks(instance, self.cfg):
2682 if self.op.ignore_failures:
2683 feedback_fn("Warning: can't remove instance's disks")
2685 raise errors.OpExecError("Can't remove instance's disks")
2687 logger.Info("removing instance %s out of cluster config" % instance.name)
2689 self.cfg.RemoveInstance(instance.name)
2692 class LUQueryInstances(NoHooksLU):
2693 """Logical unit for querying instances.
2696 _OP_REQP = ["output_fields", "names"]
2698 def CheckPrereq(self):
2699 """Check prerequisites.
2701 This checks that the fields required are valid output fields.
2704 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2705 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2706 "admin_state", "admin_ram",
2707 "disk_template", "ip", "mac", "bridge",
2708 "sda_size", "sdb_size", "vcpus", "tags",
2710 "network_port", "kernel_path", "initrd_path",
2711 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2712 "hvm_cdrom_image_path", "hvm_nic_type",
2713 "hvm_disk_type", "vnc_bind_address"],
2714 dynamic=self.dynamic_fields,
2715 selected=self.op.output_fields)
2717 self.wanted = _GetWantedInstances(self, self.op.names)
2719 def Exec(self, feedback_fn):
2720 """Computes the list of nodes and their attributes.
2723 instance_names = self.wanted
2724 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2727 # begin data gathering
2729 nodes = frozenset([inst.primary_node for inst in instance_list])
2732 if self.dynamic_fields.intersection(self.op.output_fields):
2734 node_data = rpc.call_all_instances_info(nodes)
2736 result = node_data[name]
2738 live_data.update(result)
2739 elif result == False:
2740 bad_nodes.append(name)
2741 # else no instance is alive
2743 live_data = dict([(name, {}) for name in instance_names])
2745 # end data gathering
2748 for instance in instance_list:
2750 for field in self.op.output_fields:
2755 elif field == "pnode":
2756 val = instance.primary_node
2757 elif field == "snodes":
2758 val = list(instance.secondary_nodes)
2759 elif field == "admin_state":
2760 val = (instance.status != "down")
2761 elif field == "oper_state":
2762 if instance.primary_node in bad_nodes:
2765 val = bool(live_data.get(instance.name))
2766 elif field == "status":
2767 if instance.primary_node in bad_nodes:
2768 val = "ERROR_nodedown"
2770 running = bool(live_data.get(instance.name))
2772 if instance.status != "down":
2777 if instance.status != "down":
2781 elif field == "admin_ram":
2782 val = instance.memory
2783 elif field == "oper_ram":
2784 if instance.primary_node in bad_nodes:
2786 elif instance.name in live_data:
2787 val = live_data[instance.name].get("memory", "?")
2790 elif field == "disk_template":
2791 val = instance.disk_template
2793 val = instance.nics[0].ip
2794 elif field == "bridge":
2795 val = instance.nics[0].bridge
2796 elif field == "mac":
2797 val = instance.nics[0].mac
2798 elif field == "sda_size" or field == "sdb_size":
2799 disk = instance.FindDisk(field[:3])
2804 elif field == "vcpus":
2805 val = instance.vcpus
2806 elif field == "tags":
2807 val = list(instance.GetTags())
2808 elif field == "auto_balance":
2809 val = instance.auto_balance
2810 elif field in ("network_port", "kernel_path", "initrd_path",
2811 "hvm_boot_order", "hvm_acpi", "hvm_pae",
2812 "hvm_cdrom_image_path", "hvm_nic_type",
2813 "hvm_disk_type", "vnc_bind_address"):
2814 val = getattr(instance, field, None)
2816 if field in ("hvm_nic_type", "hvm_disk_type",
2817 "kernel_path", "initrd_path"):
2822 raise errors.ParameterError(field)
2829 class LUFailoverInstance(LogicalUnit):
2830 """Failover an instance.
2833 HPATH = "instance-failover"
2834 HTYPE = constants.HTYPE_INSTANCE
2835 _OP_REQP = ["instance_name", "ignore_consistency"]
2837 def BuildHooksEnv(self):
2840 This runs on master, primary and secondary nodes of the instance.
2844 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2846 env.update(_BuildInstanceHookEnvByObject(self.instance))
2847 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2850 def CheckPrereq(self):
2851 """Check prerequisites.
2853 This checks that the instance is in the cluster.
2856 instance = self.cfg.GetInstanceInfo(
2857 self.cfg.ExpandInstanceName(self.op.instance_name))
2858 if instance is None:
2859 raise errors.OpPrereqError("Instance '%s' not known" %
2860 self.op.instance_name)
2862 if instance.disk_template not in constants.DTS_NET_MIRROR:
2863 raise errors.OpPrereqError("Instance's disk layout is not"
2864 " network mirrored, cannot failover.")
2866 secondary_nodes = instance.secondary_nodes
2867 if not secondary_nodes:
2868 raise errors.ProgrammerError("no secondary node but using "
2869 "DT_REMOTE_RAID1 template")
2871 target_node = secondary_nodes[0]
2872 # check memory requirements on the secondary node
2873 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2874 instance.name, instance.memory)
2876 # check bridge existance
2877 brlist = [nic.bridge for nic in instance.nics]
2878 if not rpc.call_bridges_exist(target_node, brlist):
2879 raise errors.OpPrereqError("One or more target bridges %s does not"
2880 " exist on destination node '%s'" %
2881 (brlist, target_node))
2883 self.instance = instance
2885 def Exec(self, feedback_fn):
2886 """Failover an instance.
2888 The failover is done by shutting it down on its present node and
2889 starting it on the secondary.
2892 instance = self.instance
2894 source_node = instance.primary_node
2895 target_node = instance.secondary_nodes[0]
2897 feedback_fn("* checking disk consistency between source and target")
2898 for dev in instance.disks:
2899 # for remote_raid1, these are md over drbd
2900 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2901 if instance.status == "up" and not self.op.ignore_consistency:
2902 raise errors.OpExecError("Disk %s is degraded on target node,"
2903 " aborting failover." % dev.iv_name)
2905 feedback_fn("* shutting down instance on source node")
2906 logger.Info("Shutting down instance %s on node %s" %
2907 (instance.name, source_node))
2909 if not rpc.call_instance_shutdown(source_node, instance):
2910 if self.op.ignore_consistency:
2911 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2912 " anyway. Please make sure node %s is down" %
2913 (instance.name, source_node, source_node))
2915 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2916 (instance.name, source_node))
2918 feedback_fn("* deactivating the instance's disks on source node")
2919 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2920 raise errors.OpExecError("Can't shut down the instance's disks.")
2922 instance.primary_node = target_node
2923 # distribute new instance config to the other nodes
2924 self.cfg.Update(instance)
2926 # Only start the instance if it's marked as up
2927 if instance.status == "up":
2928 feedback_fn("* activating the instance's disks on target node")
2929 logger.Info("Starting instance %s on node %s" %
2930 (instance.name, target_node))
2932 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2933 ignore_secondaries=True)
2935 _ShutdownInstanceDisks(instance, self.cfg)
2936 raise errors.OpExecError("Can't activate the instance's disks")
2938 feedback_fn("* starting the instance on the target node")
2939 if not rpc.call_instance_start(target_node, instance, None):
2940 _ShutdownInstanceDisks(instance, self.cfg)
2941 raise errors.OpExecError("Could not start instance %s on node %s." %
2942 (instance.name, target_node))
2945 class LUMigrateInstance(LogicalUnit):
2946 """Migrate an instance.
2948 This is migration without shutting down, compared to the failover,
2949 which is done with shutdown.
2952 HPATH = "instance-migrate"
2953 HTYPE = constants.HTYPE_INSTANCE
2954 _OP_REQP = ["instance_name", "live", "cleanup"]
2956 def BuildHooksEnv(self):
2959 This runs on master, primary and secondary nodes of the instance.
2962 env = _BuildInstanceHookEnvByObject(self.instance)
2963 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2966 def CheckPrereq(self):
2967 """Check prerequisites.
2969 This checks that the instance is in the cluster.
2972 instance = self.cfg.GetInstanceInfo(
2973 self.cfg.ExpandInstanceName(self.op.instance_name))
2974 if instance is None:
2975 raise errors.OpPrereqError("Instance '%s' not known" %
2976 self.op.instance_name)
2978 if instance.disk_template != constants.DT_DRBD8:
2979 raise errors.OpPrereqError("Instance's disk layout is not"
2980 " drbd8, cannot migrate.")
2982 secondary_nodes = instance.secondary_nodes
2983 if not secondary_nodes:
2984 raise errors.ProgrammerError("no secondary node but using "
2985 "drbd8 disk template")
2987 target_node = secondary_nodes[0]
2988 # check memory requirements on the secondary node
2989 _CheckNodeFreeMemory(self.cfg, target_node, "migrating instance %s" %
2990 instance.name, instance.memory)
2992 # check bridge existance
2993 brlist = [nic.bridge for nic in instance.nics]
2994 if not rpc.call_bridges_exist(target_node, brlist):
2995 raise errors.OpPrereqError("One or more target bridges %s does not"
2996 " exist on destination node '%s'" %
2997 (brlist, target_node))
2999 if not self.op.cleanup:
3000 migratable = rpc.call_instance_migratable(instance.primary_node,
3003 raise errors.OpPrereqError("Can't contact node '%s'" %
3004 instance.primary_node)
3005 if not migratable[0]:
3006 raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3009 self.instance = instance
3011 def _WaitUntilSync(self):
3012 """Poll with custom rpc for disk sync.
3014 This uses our own step-based rpc call.
3017 self.feedback_fn("* wait until resync is done")
3021 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3022 self.instance.disks,
3023 self.nodes_ip, False,
3024 constants.DRBD_RECONF_RPC_WFSYNC)
3026 for node in self.all_nodes:
3027 if not result[node] or not result[node][0]:
3028 raise errors.OpExecError("Cannot resync disks on node %s" % (node,))
3029 node_done, node_percent = result[node][1]
3030 all_done = all_done and node_done
3031 if node_percent is not None:
3032 min_percent = min(min_percent, node_percent)
3034 if min_percent < 100:
3035 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3038 def _EnsureSecondary(self, node):
3039 """Demote a node to secondary.
3042 self.feedback_fn("* switching node %s to secondary mode" % node)
3043 result = rpc.call_drbd_reconfig_net([node], self.instance.name,
3044 self.instance.disks,
3045 self.nodes_ip, False,
3046 constants.DRBD_RECONF_RPC_SECONDARY)
3047 if not result[node] or not result[node][0]:
3048 raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3050 (node, result[node][1]))
3052 def _GoStandalone(self):
3053 """Disconnect from the network.
3056 self.feedback_fn("* changing into standalone mode")
3057 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3058 self.instance.disks,
3059 self.nodes_ip, True,
3060 constants.DRBD_RECONF_RPC_DISCONNECT)
3061 for node in self.all_nodes:
3062 if not result[node] or not result[node][0]:
3063 raise errors.OpExecError("Cannot disconnect disks node %s,"
3064 " error %s" % (node, result[node][1]))
3066 def _GoReconnect(self, multimaster):
3067 """Reconnect to the network.
3073 msg = "single-master"
3074 self.feedback_fn("* changing disks into %s mode" % msg)
3075 result = rpc.call_drbd_reconfig_net(self.all_nodes, self.instance.name,
3076 self.instance.disks,
3079 constants.DRBD_RECONF_RPC_RECONNECT)
3080 for node in self.all_nodes:
3081 if not result[node] or not result[node][0]:
3082 raise errors.OpExecError("Cannot change disks config on node %s,"
3083 " error %s" % (node, result[node][1]))
3085 def _IdentifyDisks(self):
3086 """Start the migration RPC sequence.
3089 self.feedback_fn("* identifying disks")
3090 result = rpc.call_drbd_reconfig_net(self.all_nodes,
3092 self.instance.disks,
3093 self.nodes_ip, True,
3094 constants.DRBD_RECONF_RPC_INIT)
3095 for node in self.all_nodes:
3096 if not result[node] or not result[node][0]:
3097 raise errors.OpExecError("Cannot identify disks node %s,"
3098 " error %s" % (node, result[node][1]))
3100 def _ExecCleanup(self):
3101 """Try to cleanup after a failed migration.
3103 The cleanup is done by:
3104 - check that the instance is running only on one node
3105 (and update the config if needed)
3106 - change disks on its secondary node to secondary
3107 - wait until disks are fully synchronized
3108 - disconnect from the network
3109 - change disks into single-master mode
3110 - wait again until disks are fully synchronized
3113 instance = self.instance
3114 target_node = self.target_node
3115 source_node = self.source_node
3117 # check running on only one node
3118 self.feedback_fn("* checking where the instance actually runs"
3119 " (if this hangs, the hypervisor might be in"
3121 ins_l = rpc.call_instance_list(self.all_nodes)
3122 for node in self.all_nodes:
3123 if not type(ins_l[node]) is list:
3124 raise errors.OpExecError("Can't contact node '%s'" % node)
3126 runningon_source = instance.name in ins_l[source_node]
3127 runningon_target = instance.name in ins_l[target_node]
3129 if runningon_source and runningon_target:
3130 raise errors.OpExecError("Instance seems to be running on two nodes,"
3131 " or the hypervisor is confused. You will have"
3132 " to ensure manually that it runs only on one"
3133 " and restart this operation.")
3135 if not (runningon_source or runningon_target):
3136 raise errors.OpExecError("Instance does not seem to be running at all."
3137 " In this case, it's safer to repair by"
3138 " running 'gnt-instance stop' to ensure disk"
3139 " shutdown, and then restarting it.")
3141 if runningon_target:
3142 # the migration has actually succeeded, we need to update the config
3143 self.feedback_fn("* instance running on secondary node (%s),"
3144 " updating config" % target_node)
3145 instance.primary_node = target_node
3146 self.cfg.Update(instance)
3147 demoted_node = source_node
3149 self.feedback_fn("* instance confirmed to be running on its"
3150 " primary node (%s)" % source_node)
3151 demoted_node = target_node
3153 self._IdentifyDisks()
3155 self._EnsureSecondary(demoted_node)
3156 self._WaitUntilSync()
3157 self._GoStandalone()
3158 self._GoReconnect(False)
3159 self._WaitUntilSync()
3161 self.feedback_fn("* done")
3163 def _ExecMigration(self):
3164 """Migrate an instance.
3166 The migrate is done by:
3167 - change the disks into dual-master mode
3168 - wait until disks are fully synchronized again
3169 - migrate the instance
3170 - change disks on the new secondary node (the old primary) to secondary
3171 - wait until disks are fully synchronized
3172 - change disks into single-master mode
3175 instance = self.instance
3176 target_node = self.target_node
3177 source_node = self.source_node
3179 self.feedback_fn("* checking disk consistency between source and target")
3180 for dev in instance.disks:
3181 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
3182 raise errors.OpExecError("Disk %s is degraded or not fully"
3183 " synchronized on target node,"
3184 " aborting migrate." % dev.iv_name)
3186 self._IdentifyDisks()
3188 self._EnsureSecondary(target_node)
3189 self._GoStandalone()
3190 self._GoReconnect(True)
3191 self._WaitUntilSync()
3193 self.feedback_fn("* migrating instance to %s" % target_node)
3195 result = rpc.call_instance_migrate(source_node, instance,
3196 self.nodes_ip[target_node],
3198 if not result or not result[0]:
3199 logger.Error("Instance migration failed, trying to revert disk status")
3201 self._EnsureSecondary(target_node)
3202 self._GoStandalone()
3203 self._GoReconnect(False)
3204 self._WaitUntilSync()
3205 except errors.OpExecError, err:
3206 logger.Error("Can't reconnect the drives: error '%s'\n"
3207 "Please look and recover the instance status" % str(err))
3209 raise errors.OpExecError("Could not migrate instance %s: %s" %
3210 (instance.name, result[1]))
3213 instance.primary_node = target_node
3214 # distribute new instance config to the other nodes
3215 self.cfg.Update(instance)
3217 self._EnsureSecondary(source_node)
3218 self._WaitUntilSync()
3219 self._GoStandalone()
3220 self._GoReconnect(False)
3221 self._WaitUntilSync()
3223 self.feedback_fn("* done")
3225 def Exec(self, feedback_fn):
3226 """Perform the migration.
3229 self.feedback_fn = feedback_fn
3231 self.source_node = self.instance.primary_node
3232 self.target_node = self.instance.secondary_nodes[0]
3233 self.all_nodes = [self.source_node, self.target_node]
3235 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3236 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3239 return self._ExecCleanup()
3241 return self._ExecMigration()
3244 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
3245 """Create a tree of block devices on the primary node.
3247 This always creates all devices.
3251 for child in device.children:
3252 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
3255 cfg.SetDiskID(device, node)
3256 new_id = rpc.call_blockdev_create(node, device, device.size,
3257 instance.name, True, info)
3260 if device.physical_id is None:
3261 device.physical_id = new_id
3265 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
3266 """Create a tree of block devices on a secondary node.
3268 If this device type has to be created on secondaries, create it and
3271 If not, just recurse to children keeping the same 'force' value.
3274 if device.CreateOnSecondary():
3277 for child in device.children:
3278 if not _CreateBlockDevOnSecondary(cfg, node, instance,
3279 child, force, info):
3284 cfg.SetDiskID(device, node)
3285 new_id = rpc.call_blockdev_create(node, device, device.size,
3286 instance.name, False, info)
3289 if device.physical_id is None:
3290 device.physical_id = new_id
3294 def _GenerateUniqueNames(cfg, exts):
3295 """Generate a suitable LV name.
3297 This will generate a logical volume name for the given instance.
3302 new_id = cfg.GenerateUniqueID()
3303 results.append("%s%s" % (new_id, val))
3307 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
3308 """Generate a drbd device complete with its children.
3311 port = cfg.AllocatePort()
3312 vgname = cfg.GetVGName()
3313 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3314 logical_id=(vgname, names[0]))
3315 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3316 logical_id=(vgname, names[1]))
3317 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
3318 logical_id = (primary, secondary, port),
3319 children = [dev_data, dev_meta])
3323 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
3324 """Generate a drbd8 device complete with its children.
3327 port = cfg.AllocatePort()
3328 vgname = cfg.GetVGName()
3329 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3330 logical_id=(vgname, names[0]))
3331 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3332 logical_id=(vgname, names[1]))
3333 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3334 logical_id = (primary, secondary, port),
3335 children = [dev_data, dev_meta],
3339 def _GenerateDiskTemplate(cfg, template_name,
3340 instance_name, primary_node,
3341 secondary_nodes, disk_sz, swap_sz):
3342 """Generate the entire disk layout for a given template type.
3345 #TODO: compute space requirements
3347 vgname = cfg.GetVGName()
3348 if template_name == constants.DT_DISKLESS:
3350 elif template_name == constants.DT_PLAIN:
3351 if len(secondary_nodes) != 0:
3352 raise errors.ProgrammerError("Wrong template configuration")
3354 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
3355 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3356 logical_id=(vgname, names[0]),
3358 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3359 logical_id=(vgname, names[1]),
3361 disks = [sda_dev, sdb_dev]
3362 elif template_name == constants.DT_LOCAL_RAID1:
3363 if len(secondary_nodes) != 0:
3364 raise errors.ProgrammerError("Wrong template configuration")
3367 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
3368 ".sdb_m1", ".sdb_m2"])
3369 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3370 logical_id=(vgname, names[0]))
3371 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3372 logical_id=(vgname, names[1]))
3373 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
3375 children = [sda_dev_m1, sda_dev_m2])
3376 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3377 logical_id=(vgname, names[2]))
3378 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3379 logical_id=(vgname, names[3]))
3380 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
3382 children = [sdb_dev_m1, sdb_dev_m2])
3383 disks = [md_sda_dev, md_sdb_dev]
3384 elif template_name == constants.DT_REMOTE_RAID1:
3385 if len(secondary_nodes) != 1:
3386 raise errors.ProgrammerError("Wrong template configuration")
3387 remote_node = secondary_nodes[0]
3388 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3389 ".sdb_data", ".sdb_meta"])
3390 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3391 disk_sz, names[0:2])
3392 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
3393 children = [drbd_sda_dev], size=disk_sz)
3394 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
3395 swap_sz, names[2:4])
3396 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
3397 children = [drbd_sdb_dev], size=swap_sz)
3398 disks = [md_sda_dev, md_sdb_dev]
3399 elif template_name == constants.DT_DRBD8:
3400 if len(secondary_nodes) != 1:
3401 raise errors.ProgrammerError("Wrong template configuration")
3402 remote_node = secondary_nodes[0]
3403 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3404 ".sdb_data", ".sdb_meta"])
3405 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3406 disk_sz, names[0:2], "sda")
3407 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3408 swap_sz, names[2:4], "sdb")
3409 disks = [drbd_sda_dev, drbd_sdb_dev]
3411 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3415 def _GetInstanceInfoText(instance):
3416 """Compute that text that should be added to the disk's metadata.
3419 return "originstname+%s" % instance.name
3422 def _CreateDisks(cfg, instance):
3423 """Create all disks for an instance.
3425 This abstracts away some work from AddInstance.
3428 instance: the instance object
3431 True or False showing the success of the creation process
3434 info = _GetInstanceInfoText(instance)
3436 for device in instance.disks:
3437 logger.Info("creating volume %s for instance %s" %
3438 (device.iv_name, instance.name))
3440 for secondary_node in instance.secondary_nodes:
3441 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3442 device, False, info):
3443 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3444 (device.iv_name, device, secondary_node))
3447 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3448 instance, device, info):
3449 logger.Error("failed to create volume %s on primary!" %
3455 def _RemoveDisks(instance, cfg):
3456 """Remove all disks for an instance.
3458 This abstracts away some work from `AddInstance()` and
3459 `RemoveInstance()`. Note that in case some of the devices couldn't
3460 be removed, the removal will continue with the other ones (compare
3461 with `_CreateDisks()`).
3464 instance: the instance object
3467 True or False showing the success of the removal proces
3470 logger.Info("removing block devices for instance %s" % instance.name)
3473 for device in instance.disks:
3474 for node, disk in device.ComputeNodeTree(instance.primary_node):
3475 cfg.SetDiskID(disk, node)
3476 if not rpc.call_blockdev_remove(node, disk):
3477 logger.Error("could not remove block device %s on node %s,"
3478 " continuing anyway" %
3479 (device.iv_name, node))
3484 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3485 """Compute disk size requirements in the volume group
3487 This is currently hard-coded for the two-drive layout.
3490 # Required free disk space as a function of disk and swap space
3492 constants.DT_DISKLESS: None,
3493 constants.DT_PLAIN: disk_size + swap_size,
3494 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3495 # 256 MB are added for drbd metadata, 128MB for each drbd device
3496 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3497 constants.DT_DRBD8: disk_size + swap_size + 256,
3500 if disk_template not in req_size_dict:
3501 raise errors.ProgrammerError("Disk template '%s' size requirement"
3502 " is unknown" % disk_template)
3504 return req_size_dict[disk_template]
3507 class LUCreateInstance(LogicalUnit):
3508 """Create an instance.
3511 HPATH = "instance-add"
3512 HTYPE = constants.HTYPE_INSTANCE
3513 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3514 "disk_template", "swap_size", "mode", "start", "vcpus",
3515 "wait_for_sync", "ip_check", "mac", "auto_balance"]
3517 def _RunAllocator(self):
3518 """Run the allocator based on input opcode.
3521 disks = [{"size": self.op.disk_size, "mode": "w"},
3522 {"size": self.op.swap_size, "mode": "w"}]
3523 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3524 "bridge": self.op.bridge}]
3525 ial = IAllocator(self.cfg, self.sstore,
3526 mode=constants.IALLOCATOR_MODE_ALLOC,
3527 name=self.op.instance_name,
3528 disk_template=self.op.disk_template,
3531 vcpus=self.op.vcpus,
3532 mem_size=self.op.mem_size,
3537 ial.Run(self.op.iallocator)
3540 raise errors.OpPrereqError("Can't compute nodes using"
3541 " iallocator '%s': %s" % (self.op.iallocator,
3543 if len(ial.nodes) != ial.required_nodes:
3544 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3545 " of nodes (%s), required %s" %
3546 (self.op.iallocator, len(ial.nodes),
3547 ial.required_nodes))
3548 self.op.pnode = ial.nodes[0]
3549 logger.ToStdout("Selected nodes for the instance: %s" %
3550 (", ".join(ial.nodes),))
3551 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3552 (self.op.instance_name, self.op.iallocator, ial.nodes))
3553 if ial.required_nodes == 2:
3554 self.op.snode = ial.nodes[1]
3556 def BuildHooksEnv(self):
3559 This runs on master, primary and secondary nodes of the instance.
3563 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3564 "INSTANCE_DISK_SIZE": self.op.disk_size,
3565 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3566 "INSTANCE_ADD_MODE": self.op.mode,
3568 if self.op.mode == constants.INSTANCE_IMPORT:
3569 env["INSTANCE_SRC_NODE"] = self.op.src_node
3570 env["INSTANCE_SRC_PATH"] = self.op.src_path
3571 env["INSTANCE_SRC_IMAGE"] = self.src_image
3573 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3574 primary_node=self.op.pnode,
3575 secondary_nodes=self.secondaries,
3576 status=self.instance_status,
3577 os_type=self.op.os_type,
3578 memory=self.op.mem_size,
3579 vcpus=self.op.vcpus,
3580 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3583 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3588 def CheckPrereq(self):
3589 """Check prerequisites.
3592 # set optional parameters to none if they don't exist
3593 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3594 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3595 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3596 if not hasattr(self.op, attr):
3597 setattr(self.op, attr, None)
3599 if self.op.mode not in (constants.INSTANCE_CREATE,
3600 constants.INSTANCE_IMPORT):
3601 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3604 if self.op.mode == constants.INSTANCE_IMPORT:
3605 src_node = getattr(self.op, "src_node", None)
3606 src_path = getattr(self.op, "src_path", None)
3607 if src_node is None or src_path is None:
3608 raise errors.OpPrereqError("Importing an instance requires source"
3609 " node and path options")
3610 src_node_full = self.cfg.ExpandNodeName(src_node)
3611 if src_node_full is None:
3612 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3613 self.op.src_node = src_node = src_node_full
3615 if not os.path.isabs(src_path):
3616 raise errors.OpPrereqError("The source path must be absolute")
3618 export_info = rpc.call_export_info(src_node, src_path)
3621 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3623 if not export_info.has_section(constants.INISECT_EXP):
3624 raise errors.ProgrammerError("Corrupted export config")
3626 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3627 if (int(ei_version) != constants.EXPORT_VERSION):
3628 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3629 (ei_version, constants.EXPORT_VERSION))
3631 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3632 raise errors.OpPrereqError("Can't import instance with more than"
3635 # FIXME: are the old os-es, disk sizes, etc. useful?
3636 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3637 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3639 self.src_image = diskimage
3640 else: # INSTANCE_CREATE
3641 if getattr(self.op, "os_type", None) is None:
3642 raise errors.OpPrereqError("No guest OS specified")
3644 #### instance parameters check
3646 # disk template and mirror node verification
3647 if self.op.disk_template not in constants.DISK_TEMPLATES:
3648 raise errors.OpPrereqError("Invalid disk template name")
3650 # instance name verification
3651 hostname1 = utils.HostInfo(self.op.instance_name)
3653 self.op.instance_name = instance_name = hostname1.name
3654 instance_list = self.cfg.GetInstanceList()
3655 if instance_name in instance_list:
3656 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3659 # ip validity checks
3660 ip = getattr(self.op, "ip", None)
3661 if ip is None or ip.lower() == "none":
3663 elif ip.lower() == "auto":
3664 inst_ip = hostname1.ip
3666 if not utils.IsValidIP(ip):
3667 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3668 " like a valid IP" % ip)
3670 self.inst_ip = self.op.ip = inst_ip
3672 if self.op.start and not self.op.ip_check:
3673 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3674 " adding an instance in start mode")
3676 if self.op.ip_check:
3677 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3678 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3679 (hostname1.ip, instance_name))
3681 # MAC address verification
3682 if self.op.mac != "auto":
3683 if not utils.IsValidMac(self.op.mac.lower()):
3684 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3687 # bridge verification
3688 bridge = getattr(self.op, "bridge", None)
3690 self.op.bridge = self.cfg.GetDefBridge()
3692 self.op.bridge = bridge
3694 # boot order verification
3695 if self.op.hvm_boot_order is not None:
3696 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3697 raise errors.OpPrereqError("invalid boot order specified,"
3698 " must be one or more of [acdn]")
3701 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3702 raise errors.OpPrereqError("One and only one of iallocator and primary"
3703 " node must be given")
3705 if self.op.iallocator is not None:
3706 self._RunAllocator()
3708 #### node related checks
3710 # check primary node
3711 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3713 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3715 self.op.pnode = pnode.name
3717 self.secondaries = []
3719 # mirror node verification
3720 if self.op.disk_template in constants.DTS_NET_MIRROR:
3721 if getattr(self.op, "snode", None) is None:
3722 raise errors.OpPrereqError("The networked disk templates need"
3725 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3726 if snode_name is None:
3727 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3729 elif snode_name == pnode.name:
3730 raise errors.OpPrereqError("The secondary node cannot be"
3731 " the primary node.")
3732 self.secondaries.append(snode_name)
3734 req_size = _ComputeDiskSize(self.op.disk_template,
3735 self.op.disk_size, self.op.swap_size)
3737 # Check lv size requirements
3738 if req_size is not None:
3739 nodenames = [pnode.name] + self.secondaries
3740 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3741 for node in nodenames:
3742 info = nodeinfo.get(node, None)
3744 raise errors.OpPrereqError("Cannot get current information"
3745 " from node '%s'" % node)
3746 vg_free = info.get('vg_free', None)
3747 if not isinstance(vg_free, int):
3748 raise errors.OpPrereqError("Can't compute free disk space on"
3750 if req_size > info['vg_free']:
3751 raise errors.OpPrereqError("Not enough disk space on target node %s."
3752 " %d MB available, %d MB required" %
3753 (node, info['vg_free'], req_size))
3756 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3758 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3759 " primary node" % self.op.os_type)
3761 if self.op.kernel_path == constants.VALUE_NONE:
3762 raise errors.OpPrereqError("Can't set instance kernel to none")
3765 # bridge check on primary node
3766 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3767 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3768 " destination node '%s'" %
3769 (self.op.bridge, pnode.name))
3771 # memory check on primary node
3773 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3774 "creating instance %s" % self.op.instance_name,
3777 # hvm_cdrom_image_path verification
3778 if self.op.hvm_cdrom_image_path is not None:
3779 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3780 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3781 " be an absolute path or None, not %s" %
3782 self.op.hvm_cdrom_image_path)
3783 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3784 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3785 " regular file or a symlink pointing to"
3786 " an existing regular file, not %s" %
3787 self.op.hvm_cdrom_image_path)
3789 # vnc_bind_address verification
3790 if self.op.vnc_bind_address is not None:
3791 if not utils.IsValidIP(self.op.vnc_bind_address):
3792 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3793 " like a valid IP address" %
3794 self.op.vnc_bind_address)
3796 # Xen HVM device type checks
3797 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3798 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3799 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3800 " hypervisor" % self.op.hvm_nic_type)
3801 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3802 raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3803 " hypervisor" % self.op.hvm_disk_type)
3806 self.instance_status = 'up'
3808 self.instance_status = 'down'
3810 def Exec(self, feedback_fn):
3811 """Create and add the instance to the cluster.
3814 instance = self.op.instance_name
3815 pnode_name = self.pnode.name
3817 if self.op.mac == "auto":
3818 mac_address = self.cfg.GenerateMAC()
3820 mac_address = self.op.mac
3822 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3823 if self.inst_ip is not None:
3824 nic.ip = self.inst_ip
3826 ht_kind = self.sstore.GetHypervisorType()
3827 if ht_kind in constants.HTS_REQ_PORT:
3828 network_port = self.cfg.AllocatePort()
3832 if self.op.vnc_bind_address is None:
3833 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3835 disks = _GenerateDiskTemplate(self.cfg,
3836 self.op.disk_template,
3837 instance, pnode_name,
3838 self.secondaries, self.op.disk_size,
3841 iobj = objects.Instance(name=instance, os=self.op.os_type,
3842 primary_node=pnode_name,
3843 memory=self.op.mem_size,
3844 vcpus=self.op.vcpus,
3845 nics=[nic], disks=disks,
3846 disk_template=self.op.disk_template,
3847 status=self.instance_status,
3848 network_port=network_port,
3849 kernel_path=self.op.kernel_path,
3850 initrd_path=self.op.initrd_path,
3851 hvm_boot_order=self.op.hvm_boot_order,
3852 hvm_acpi=self.op.hvm_acpi,
3853 hvm_pae=self.op.hvm_pae,
3854 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3855 vnc_bind_address=self.op.vnc_bind_address,
3856 hvm_nic_type=self.op.hvm_nic_type,
3857 hvm_disk_type=self.op.hvm_disk_type,
3858 auto_balance=bool(self.op.auto_balance),
3861 feedback_fn("* creating instance disks...")
3862 if not _CreateDisks(self.cfg, iobj):
3863 _RemoveDisks(iobj, self.cfg)
3864 raise errors.OpExecError("Device creation failed, reverting...")
3866 feedback_fn("adding instance %s to cluster config" % instance)
3868 self.cfg.AddInstance(iobj)
3870 if self.op.wait_for_sync:
3871 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3872 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3873 # make sure the disks are not degraded (still sync-ing is ok)
3875 feedback_fn("* checking mirrors status")
3876 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3881 _RemoveDisks(iobj, self.cfg)
3882 self.cfg.RemoveInstance(iobj.name)
3883 raise errors.OpExecError("There are some degraded disks for"
3886 feedback_fn("creating os for instance %s on node %s" %
3887 (instance, pnode_name))
3889 if iobj.disk_template != constants.DT_DISKLESS:
3890 if self.op.mode == constants.INSTANCE_CREATE:
3891 feedback_fn("* running the instance OS create scripts...")
3892 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3893 raise errors.OpExecError("could not add os for instance %s"
3895 (instance, pnode_name))
3897 elif self.op.mode == constants.INSTANCE_IMPORT:
3898 feedback_fn("* running the instance OS import scripts...")
3899 src_node = self.op.src_node
3900 src_image = self.src_image
3901 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3902 src_node, src_image):
3903 raise errors.OpExecError("Could not import os for instance"
3905 (instance, pnode_name))
3907 # also checked in the prereq part
3908 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3912 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3913 feedback_fn("* starting instance...")
3914 if not rpc.call_instance_start(pnode_name, iobj, None):
3915 raise errors.OpExecError("Could not start instance")
3918 class LUConnectConsole(NoHooksLU):
3919 """Connect to an instance's console.
3921 This is somewhat special in that it returns the command line that
3922 you need to run on the master node in order to connect to the
3926 _OP_REQP = ["instance_name"]
3928 def CheckPrereq(self):
3929 """Check prerequisites.
3931 This checks that the instance is in the cluster.
3934 instance = self.cfg.GetInstanceInfo(
3935 self.cfg.ExpandInstanceName(self.op.instance_name))
3936 if instance is None:
3937 raise errors.OpPrereqError("Instance '%s' not known" %
3938 self.op.instance_name)
3939 self.instance = instance
3941 def Exec(self, feedback_fn):
3942 """Connect to the console of an instance
3945 instance = self.instance
3946 node = instance.primary_node
3948 node_insts = rpc.call_instance_list([node])[node]
3949 if node_insts is False:
3950 raise errors.OpExecError("Can't connect to node %s." % node)
3952 if instance.name not in node_insts:
3953 raise errors.OpExecError("Instance %s is not running." % instance.name)
3955 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3957 hyper = hypervisor.GetHypervisor()
3958 console_cmd = hyper.GetShellCommandForConsole(instance)
3960 argv = ["ssh", "-q", "-t"]
3961 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3962 argv.extend(ssh.BATCH_MODE_OPTS)
3964 argv.append(console_cmd)
3968 class LUAddMDDRBDComponent(LogicalUnit):
3969 """Adda new mirror member to an instance's disk.
3972 HPATH = "mirror-add"
3973 HTYPE = constants.HTYPE_INSTANCE
3974 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3976 def BuildHooksEnv(self):
3979 This runs on the master, the primary and all the secondaries.
3983 "NEW_SECONDARY": self.op.remote_node,
3984 "DISK_NAME": self.op.disk_name,
3986 env.update(_BuildInstanceHookEnvByObject(self.instance))
3987 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3988 self.op.remote_node,] + list(self.instance.secondary_nodes)
3991 def CheckPrereq(self):
3992 """Check prerequisites.
3994 This checks that the instance is in the cluster.
3997 instance = self.cfg.GetInstanceInfo(
3998 self.cfg.ExpandInstanceName(self.op.instance_name))
3999 if instance is None:
4000 raise errors.OpPrereqError("Instance '%s' not known" %
4001 self.op.instance_name)
4002 self.instance = instance
4004 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4005 if remote_node is None:
4006 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
4007 self.remote_node = remote_node
4009 if remote_node == instance.primary_node:
4010 raise errors.OpPrereqError("The specified node is the primary node of"
4013 if instance.disk_template != constants.DT_REMOTE_RAID1:
4014 raise errors.OpPrereqError("Instance's disk layout is not"
4016 for disk in instance.disks:
4017 if disk.iv_name == self.op.disk_name:
4020 raise errors.OpPrereqError("Can't find this device ('%s') in the"
4021 " instance." % self.op.disk_name)
4022 if len(disk.children) > 1:
4023 raise errors.OpPrereqError("The device already has two slave devices."
4024 " This would create a 3-disk raid1 which we"
4028 def Exec(self, feedback_fn):
4029 """Add the mirror component
4033 instance = self.instance
4035 remote_node = self.remote_node
4036 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
4037 names = _GenerateUniqueNames(self.cfg, lv_names)
4038 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
4039 remote_node, disk.size, names)
4041 logger.Info("adding new mirror component on secondary")
4043 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
4045 _GetInstanceInfoText(instance)):
4046 raise errors.OpExecError("Failed to create new component on secondary"
4047 " node %s" % remote_node)
4049 logger.Info("adding new mirror component on primary")
4051 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
4053 _GetInstanceInfoText(instance)):
4054 # remove secondary dev
4055 self.cfg.SetDiskID(new_drbd, remote_node)
4056 rpc.call_blockdev_remove(remote_node, new_drbd)
4057 raise errors.OpExecError("Failed to create volume on primary")
4059 # the device exists now
4060 # call the primary node to add the mirror to md
4061 logger.Info("adding new mirror component to md")
4062 if not rpc.call_blockdev_addchildren(instance.primary_node,
4064 logger.Error("Can't add mirror compoment to md!")
4065 self.cfg.SetDiskID(new_drbd, remote_node)
4066 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4067 logger.Error("Can't rollback on secondary")
4068 self.cfg.SetDiskID(new_drbd, instance.primary_node)
4069 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4070 logger.Error("Can't rollback on primary")
4071 raise errors.OpExecError("Can't add mirror component to md array")
4073 disk.children.append(new_drbd)
4075 self.cfg.AddInstance(instance)
4077 _WaitForSync(self.cfg, instance, self.proc)
4082 class LURemoveMDDRBDComponent(LogicalUnit):
4083 """Remove a component from a remote_raid1 disk.
4086 HPATH = "mirror-remove"
4087 HTYPE = constants.HTYPE_INSTANCE
4088 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
4090 def BuildHooksEnv(self):
4093 This runs on the master, the primary and all the secondaries.
4097 "DISK_NAME": self.op.disk_name,
4098 "DISK_ID": self.op.disk_id,
4099 "OLD_SECONDARY": self.old_secondary,
4101 env.update(_BuildInstanceHookEnvByObject(self.instance))
4102 nl = [self.sstore.GetMasterNode(),
4103 self.instance.primary_node] + list(self.instance.secondary_nodes)
4106 def CheckPrereq(self):
4107 """Check prerequisites.
4109 This checks that the instance is in the cluster.
4112 instance = self.cfg.GetInstanceInfo(
4113 self.cfg.ExpandInstanceName(self.op.instance_name))
4114 if instance is None:
4115 raise errors.OpPrereqError("Instance '%s' not known" %
4116 self.op.instance_name)
4117 self.instance = instance
4119 if instance.disk_template != constants.DT_REMOTE_RAID1:
4120 raise errors.OpPrereqError("Instance's disk layout is not"
4122 for disk in instance.disks:
4123 if disk.iv_name == self.op.disk_name:
4126 raise errors.OpPrereqError("Can't find this device ('%s') in the"
4127 " instance." % self.op.disk_name)
4128 for child in disk.children:
4129 if (child.dev_type == constants.LD_DRBD7 and
4130 child.logical_id[2] == self.op.disk_id):
4133 raise errors.OpPrereqError("Can't find the device with this port.")
4135 if len(disk.children) < 2:
4136 raise errors.OpPrereqError("Cannot remove the last component from"
4140 if self.child.logical_id[0] == instance.primary_node:
4144 self.old_secondary = self.child.logical_id[oid]
4146 def Exec(self, feedback_fn):
4147 """Remove the mirror component
4150 instance = self.instance
4153 logger.Info("remove mirror component")
4154 self.cfg.SetDiskID(disk, instance.primary_node)
4155 if not rpc.call_blockdev_removechildren(instance.primary_node,
4157 raise errors.OpExecError("Can't remove child from mirror.")
4159 for node in child.logical_id[:2]:
4160 self.cfg.SetDiskID(child, node)
4161 if not rpc.call_blockdev_remove(node, child):
4162 logger.Error("Warning: failed to remove device from node %s,"
4163 " continuing operation." % node)
4165 disk.children.remove(child)
4166 self.cfg.AddInstance(instance)
4169 class LUReplaceDisks(LogicalUnit):
4170 """Replace the disks of an instance.
4173 HPATH = "mirrors-replace"
4174 HTYPE = constants.HTYPE_INSTANCE
4175 _OP_REQP = ["instance_name", "mode", "disks"]
4177 def _RunAllocator(self):
4178 """Compute a new secondary node using an IAllocator.
4181 ial = IAllocator(self.cfg, self.sstore,
4182 mode=constants.IALLOCATOR_MODE_RELOC,
4183 name=self.op.instance_name,
4184 relocate_from=[self.sec_node])
4186 ial.Run(self.op.iallocator)
4189 raise errors.OpPrereqError("Can't compute nodes using"
4190 " iallocator '%s': %s" % (self.op.iallocator,
4192 if len(ial.nodes) != ial.required_nodes:
4193 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4194 " of nodes (%s), required %s" %
4195 (len(ial.nodes), ial.required_nodes))
4196 self.op.remote_node = ial.nodes[0]
4197 logger.ToStdout("Selected new secondary for the instance: %s" %
4198 self.op.remote_node)
4200 def BuildHooksEnv(self):
4203 This runs on the master, the primary and all the secondaries.
4207 "MODE": self.op.mode,
4208 "NEW_SECONDARY": self.op.remote_node,
4209 "OLD_SECONDARY": self.instance.secondary_nodes[0],
4211 env.update(_BuildInstanceHookEnvByObject(self.instance))
4213 self.sstore.GetMasterNode(),
4214 self.instance.primary_node,
4216 if self.op.remote_node is not None:
4217 nl.append(self.op.remote_node)
4220 def CheckPrereq(self):
4221 """Check prerequisites.
4223 This checks that the instance is in the cluster.
4226 if not hasattr(self.op, "remote_node"):
4227 self.op.remote_node = None
4229 instance = self.cfg.GetInstanceInfo(
4230 self.cfg.ExpandInstanceName(self.op.instance_name))
4231 if instance is None:
4232 raise errors.OpPrereqError("Instance '%s' not known" %
4233 self.op.instance_name)
4234 self.instance = instance
4235 self.op.instance_name = instance.name
4237 if instance.disk_template not in constants.DTS_NET_MIRROR:
4238 raise errors.OpPrereqError("Instance's disk layout is not"
4239 " network mirrored.")
4241 if len(instance.secondary_nodes) != 1:
4242 raise errors.OpPrereqError("The instance has a strange layout,"
4243 " expected one secondary but found %d" %
4244 len(instance.secondary_nodes))
4246 self.sec_node = instance.secondary_nodes[0]
4248 ia_name = getattr(self.op, "iallocator", None)
4249 if ia_name is not None:
4250 if self.op.remote_node is not None:
4251 raise errors.OpPrereqError("Give either the iallocator or the new"
4252 " secondary, not both")
4253 self._RunAllocator()
4255 remote_node = self.op.remote_node
4256 if remote_node is not None:
4257 remote_node = self.cfg.ExpandNodeName(remote_node)
4258 if remote_node is None:
4259 raise errors.OpPrereqError("Node '%s' not known" %
4260 self.op.remote_node)
4261 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4263 self.remote_node_info = None
4264 if remote_node == instance.primary_node:
4265 raise errors.OpPrereqError("The specified node is the primary node of"
4267 elif remote_node == self.sec_node:
4268 if self.op.mode == constants.REPLACE_DISK_SEC:
4269 # this is for DRBD8, where we can't execute the same mode of
4270 # replacement as for drbd7 (no different port allocated)
4271 raise errors.OpPrereqError("Same secondary given, cannot execute"
4273 # the user gave the current secondary, switch to
4274 # 'no-replace-secondary' mode for drbd7
4276 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
4277 self.op.mode != constants.REPLACE_DISK_ALL):
4278 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
4279 " disks replacement, not individual ones")
4280 if instance.disk_template == constants.DT_DRBD8:
4281 if (self.op.mode == constants.REPLACE_DISK_ALL and
4282 remote_node is not None):
4283 # switch to replace secondary mode
4284 self.op.mode = constants.REPLACE_DISK_SEC
4286 if self.op.mode == constants.REPLACE_DISK_ALL:
4287 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4288 " secondary disk replacement, not"
4290 elif self.op.mode == constants.REPLACE_DISK_PRI:
4291 if remote_node is not None:
4292 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4293 " the secondary while doing a primary"
4294 " node disk replacement")
4295 self.tgt_node = instance.primary_node
4296 self.oth_node = instance.secondary_nodes[0]
4297 elif self.op.mode == constants.REPLACE_DISK_SEC:
4298 self.new_node = remote_node # this can be None, in which case
4299 # we don't change the secondary
4300 self.tgt_node = instance.secondary_nodes[0]
4301 self.oth_node = instance.primary_node
4303 raise errors.ProgrammerError("Unhandled disk replace mode")
4305 for name in self.op.disks:
4306 if instance.FindDisk(name) is None:
4307 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4308 (name, instance.name))
4309 self.op.remote_node = remote_node
4311 def _ExecRR1(self, feedback_fn):
4312 """Replace the disks of an instance.
4315 instance = self.instance
4318 if self.op.remote_node is None:
4319 remote_node = self.sec_node
4321 remote_node = self.op.remote_node
4323 for dev in instance.disks:
4325 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4326 names = _GenerateUniqueNames(cfg, lv_names)
4327 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
4328 remote_node, size, names)
4329 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
4330 logger.Info("adding new mirror component on secondary for %s" %
4333 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
4335 _GetInstanceInfoText(instance)):
4336 raise errors.OpExecError("Failed to create new component on secondary"
4337 " node %s. Full abort, cleanup manually!" %
4340 logger.Info("adding new mirror component on primary")
4342 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
4344 _GetInstanceInfoText(instance)):
4345 # remove secondary dev
4346 cfg.SetDiskID(new_drbd, remote_node)
4347 rpc.call_blockdev_remove(remote_node, new_drbd)
4348 raise errors.OpExecError("Failed to create volume on primary!"
4349 " Full abort, cleanup manually!!")
4351 # the device exists now
4352 # call the primary node to add the mirror to md
4353 logger.Info("adding new mirror component to md")
4354 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
4356 logger.Error("Can't add mirror compoment to md!")
4357 cfg.SetDiskID(new_drbd, remote_node)
4358 if not rpc.call_blockdev_remove(remote_node, new_drbd):
4359 logger.Error("Can't rollback on secondary")
4360 cfg.SetDiskID(new_drbd, instance.primary_node)
4361 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
4362 logger.Error("Can't rollback on primary")
4363 raise errors.OpExecError("Full abort, cleanup manually!!")
4365 dev.children.append(new_drbd)
4366 cfg.AddInstance(instance)
4368 # this can fail as the old devices are degraded and _WaitForSync
4369 # does a combined result over all disks, so we don't check its
4371 _WaitForSync(cfg, instance, self.proc, unlock=True)
4373 # so check manually all the devices
4374 for name in iv_names:
4375 dev, child, new_drbd = iv_names[name]
4376 cfg.SetDiskID(dev, instance.primary_node)
4377 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4379 raise errors.OpExecError("MD device %s is degraded!" % name)
4380 cfg.SetDiskID(new_drbd, instance.primary_node)
4381 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
4383 raise errors.OpExecError("New drbd device %s is degraded!" % name)
4385 for name in iv_names:
4386 dev, child, new_drbd = iv_names[name]
4387 logger.Info("remove mirror %s component" % name)
4388 cfg.SetDiskID(dev, instance.primary_node)
4389 if not rpc.call_blockdev_removechildren(instance.primary_node,
4391 logger.Error("Can't remove child from mirror, aborting"
4392 " *this device cleanup*.\nYou need to cleanup manually!!")
4395 for node in child.logical_id[:2]:
4396 logger.Info("remove child device on %s" % node)
4397 cfg.SetDiskID(child, node)
4398 if not rpc.call_blockdev_remove(node, child):
4399 logger.Error("Warning: failed to remove device from node %s,"
4400 " continuing operation." % node)
4402 dev.children.remove(child)
4404 cfg.AddInstance(instance)
4406 def _ExecD8DiskOnly(self, feedback_fn):
4407 """Replace a disk on the primary or secondary for dbrd8.
4409 The algorithm for replace is quite complicated:
4410 - for each disk to be replaced:
4411 - create new LVs on the target node with unique names
4412 - detach old LVs from the drbd device
4413 - rename old LVs to name_replaced.<time_t>
4414 - rename new LVs to old LVs
4415 - attach the new LVs (with the old names now) to the drbd device
4416 - wait for sync across all devices
4417 - for each modified disk:
4418 - remove old LVs (which have the name name_replaces.<time_t>)
4420 Failures are not very well handled.
4424 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4425 instance = self.instance
4427 vgname = self.cfg.GetVGName()
4430 tgt_node = self.tgt_node
4431 oth_node = self.oth_node
4433 # Step: check device activation
4434 self.proc.LogStep(1, steps_total, "check device existence")
4435 info("checking volume groups")
4436 my_vg = cfg.GetVGName()
4437 results = rpc.call_vg_list([oth_node, tgt_node])
4439 raise errors.OpExecError("Can't list volume groups on the nodes")
4440 for node in oth_node, tgt_node:
4441 res = results.get(node, False)
4442 if not res or my_vg not in res:
4443 raise errors.OpExecError("Volume group '%s' not found on %s" %
4445 for dev in instance.disks:
4446 if not dev.iv_name in self.op.disks:
4448 for node in tgt_node, oth_node:
4449 info("checking %s on %s" % (dev.iv_name, node))
4450 cfg.SetDiskID(dev, node)
4451 if not rpc.call_blockdev_find(node, dev):
4452 raise errors.OpExecError("Can't find device %s on node %s" %
4453 (dev.iv_name, node))
4455 # Step: check other node consistency
4456 self.proc.LogStep(2, steps_total, "check peer consistency")
4457 for dev in instance.disks:
4458 if not dev.iv_name in self.op.disks:
4460 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4461 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4462 oth_node==instance.primary_node):
4463 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4464 " to replace disks on this node (%s)" %
4465 (oth_node, tgt_node))
4467 # Step: create new storage
4468 self.proc.LogStep(3, steps_total, "allocate new storage")
4469 for dev in instance.disks:
4470 if not dev.iv_name in self.op.disks:
4473 cfg.SetDiskID(dev, tgt_node)
4474 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4475 names = _GenerateUniqueNames(cfg, lv_names)
4476 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4477 logical_id=(vgname, names[0]))
4478 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4479 logical_id=(vgname, names[1]))
4480 new_lvs = [lv_data, lv_meta]
4481 old_lvs = dev.children
4482 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4483 info("creating new local storage on %s for %s" %
4484 (tgt_node, dev.iv_name))
4485 # since we *always* want to create this LV, we use the
4486 # _Create...OnPrimary (which forces the creation), even if we
4487 # are talking about the secondary node
4488 for new_lv in new_lvs:
4489 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4490 _GetInstanceInfoText(instance)):
4491 raise errors.OpExecError("Failed to create new LV named '%s' on"
4493 (new_lv.logical_id[1], tgt_node))
4495 # Step: for each lv, detach+rename*2+attach
4496 self.proc.LogStep(4, steps_total, "change drbd configuration")
4497 for dev, old_lvs, new_lvs in iv_names.itervalues():
4498 info("detaching %s drbd from local storage" % dev.iv_name)
4499 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4500 raise errors.OpExecError("Can't detach drbd from local storage on node"
4501 " %s for device %s" % (tgt_node, dev.iv_name))
4503 #cfg.Update(instance)
4505 # ok, we created the new LVs, so now we know we have the needed
4506 # storage; as such, we proceed on the target node to rename
4507 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4508 # using the assumption that logical_id == physical_id (which in
4509 # turn is the unique_id on that node)
4511 # FIXME(iustin): use a better name for the replaced LVs
4512 temp_suffix = int(time.time())
4513 ren_fn = lambda d, suff: (d.physical_id[0],
4514 d.physical_id[1] + "_replaced-%s" % suff)
4515 # build the rename list based on what LVs exist on the node
4517 for to_ren in old_lvs:
4518 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4519 if find_res is not None: # device exists
4520 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4522 info("renaming the old LVs on the target node")
4523 if not rpc.call_blockdev_rename(tgt_node, rlist):
4524 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4525 # now we rename the new LVs to the old LVs
4526 info("renaming the new LVs on the target node")
4527 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4528 if not rpc.call_blockdev_rename(tgt_node, rlist):
4529 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4531 for old, new in zip(old_lvs, new_lvs):
4532 new.logical_id = old.logical_id
4533 cfg.SetDiskID(new, tgt_node)
4535 for disk in old_lvs:
4536 disk.logical_id = ren_fn(disk, temp_suffix)
4537 cfg.SetDiskID(disk, tgt_node)
4539 # now that the new lvs have the old name, we can add them to the device
4540 info("adding new mirror component on %s" % tgt_node)
4541 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4542 for new_lv in new_lvs:
4543 if not rpc.call_blockdev_remove(tgt_node, new_lv):
4544 warning("Can't rollback device %s", hint="manually cleanup unused"
4546 raise errors.OpExecError("Can't add local storage to drbd")
4548 dev.children = new_lvs
4549 cfg.Update(instance)
4551 # Step: wait for sync
4553 # this can fail as the old devices are degraded and _WaitForSync
4554 # does a combined result over all disks, so we don't check its
4556 self.proc.LogStep(5, steps_total, "sync devices")
4557 _WaitForSync(cfg, instance, self.proc, unlock=True)
4559 # so check manually all the devices
4560 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4561 cfg.SetDiskID(dev, instance.primary_node)
4562 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4564 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4566 # Step: remove old storage
4567 self.proc.LogStep(6, steps_total, "removing old storage")
4568 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4569 info("remove logical volumes for %s" % name)
4571 cfg.SetDiskID(lv, tgt_node)
4572 if not rpc.call_blockdev_remove(tgt_node, lv):
4573 warning("Can't remove old LV", hint="manually remove unused LVs")
4576 def _ExecD8Secondary(self, feedback_fn):
4577 """Replace the secondary node for drbd8.
4579 The algorithm for replace is quite complicated:
4580 - for all disks of the instance:
4581 - create new LVs on the new node with same names
4582 - shutdown the drbd device on the old secondary
4583 - disconnect the drbd network on the primary
4584 - create the drbd device on the new secondary
4585 - network attach the drbd on the primary, using an artifice:
4586 the drbd code for Attach() will connect to the network if it
4587 finds a device which is connected to the good local disks but
4589 - wait for sync across all devices
4590 - remove all disks from the old secondary
4592 Failures are not very well handled.
4596 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4597 instance = self.instance
4599 vgname = self.cfg.GetVGName()
4602 old_node = self.tgt_node
4603 new_node = self.new_node
4604 pri_node = instance.primary_node
4606 # Step: check device activation
4607 self.proc.LogStep(1, steps_total, "check device existence")
4608 info("checking volume groups")
4609 my_vg = cfg.GetVGName()
4610 results = rpc.call_vg_list([pri_node, new_node])
4612 raise errors.OpExecError("Can't list volume groups on the nodes")
4613 for node in pri_node, new_node:
4614 res = results.get(node, False)
4615 if not res or my_vg not in res:
4616 raise errors.OpExecError("Volume group '%s' not found on %s" %
4618 for dev in instance.disks:
4619 if not dev.iv_name in self.op.disks:
4621 info("checking %s on %s" % (dev.iv_name, pri_node))
4622 cfg.SetDiskID(dev, pri_node)
4623 if not rpc.call_blockdev_find(pri_node, dev):
4624 raise errors.OpExecError("Can't find device %s on node %s" %
4625 (dev.iv_name, pri_node))
4627 # Step: check other node consistency
4628 self.proc.LogStep(2, steps_total, "check peer consistency")
4629 for dev in instance.disks:
4630 if not dev.iv_name in self.op.disks:
4632 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4633 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4634 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4635 " unsafe to replace the secondary" %
4638 # Step: create new storage
4639 self.proc.LogStep(3, steps_total, "allocate new storage")
4640 for dev in instance.disks:
4642 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4643 # since we *always* want to create this LV, we use the
4644 # _Create...OnPrimary (which forces the creation), even if we
4645 # are talking about the secondary node
4646 for new_lv in dev.children:
4647 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4648 _GetInstanceInfoText(instance)):
4649 raise errors.OpExecError("Failed to create new LV named '%s' on"
4651 (new_lv.logical_id[1], new_node))
4653 iv_names[dev.iv_name] = (dev, dev.children)
4655 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4656 for dev in instance.disks:
4658 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4659 # create new devices on new_node
4660 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4661 logical_id=(pri_node, new_node,
4663 children=dev.children)
4664 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4666 _GetInstanceInfoText(instance)):
4667 raise errors.OpExecError("Failed to create new DRBD on"
4668 " node '%s'" % new_node)
4670 for dev in instance.disks:
4671 # we have new devices, shutdown the drbd on the old secondary
4672 info("shutting down drbd for %s on old node" % dev.iv_name)
4673 cfg.SetDiskID(dev, old_node)
4674 if not rpc.call_blockdev_shutdown(old_node, dev):
4675 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4676 hint="Please cleanup this device manually as soon as possible")
4678 info("detaching primary drbds from the network (=> standalone)")
4680 for dev in instance.disks:
4681 cfg.SetDiskID(dev, pri_node)
4682 # set the physical (unique in bdev terms) id to None, meaning
4683 # detach from network
4684 dev.physical_id = (None,) * len(dev.physical_id)
4685 # and 'find' the device, which will 'fix' it to match the
4687 if rpc.call_blockdev_find(pri_node, dev):
4690 warning("Failed to detach drbd %s from network, unusual case" %
4694 # no detaches succeeded (very unlikely)
4695 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4697 # if we managed to detach at least one, we update all the disks of
4698 # the instance to point to the new secondary
4699 info("updating instance configuration")
4700 for dev in instance.disks:
4701 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4702 cfg.SetDiskID(dev, pri_node)
4703 cfg.Update(instance)
4705 # and now perform the drbd attach
4706 info("attaching primary drbds to new secondary (standalone => connected)")
4708 for dev in instance.disks:
4709 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4710 # since the attach is smart, it's enough to 'find' the device,
4711 # it will automatically activate the network, if the physical_id
4713 cfg.SetDiskID(dev, pri_node)
4714 if not rpc.call_blockdev_find(pri_node, dev):
4715 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4716 "please do a gnt-instance info to see the status of disks")
4718 # this can fail as the old devices are degraded and _WaitForSync
4719 # does a combined result over all disks, so we don't check its
4721 self.proc.LogStep(5, steps_total, "sync devices")
4722 _WaitForSync(cfg, instance, self.proc, unlock=True)
4724 # so check manually all the devices
4725 for name, (dev, old_lvs) in iv_names.iteritems():
4726 cfg.SetDiskID(dev, pri_node)
4727 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4729 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4731 self.proc.LogStep(6, steps_total, "removing old storage")
4732 for name, (dev, old_lvs) in iv_names.iteritems():
4733 info("remove logical volumes for %s" % name)
4735 cfg.SetDiskID(lv, old_node)
4736 if not rpc.call_blockdev_remove(old_node, lv):
4737 warning("Can't remove LV on old secondary",
4738 hint="Cleanup stale volumes by hand")
4740 def Exec(self, feedback_fn):
4741 """Execute disk replacement.
4743 This dispatches the disk replacement to the appropriate handler.
4746 instance = self.instance
4748 # Activate the instance disks if we're replacing them on a down instance
4749 if instance.status == "down":
4750 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
4751 self.proc.ChainOpCode(op)
4753 if instance.disk_template == constants.DT_REMOTE_RAID1:
4755 elif instance.disk_template == constants.DT_DRBD8:
4756 if self.op.remote_node is None:
4757 fn = self._ExecD8DiskOnly
4759 fn = self._ExecD8Secondary
4761 raise errors.ProgrammerError("Unhandled disk replacement case")
4763 ret = fn(feedback_fn)
4765 # Deactivate the instance disks if we're replacing them on a down instance
4766 if instance.status == "down":
4767 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
4768 self.proc.ChainOpCode(op)
4773 class LUGrowDisk(LogicalUnit):
4774 """Grow a disk of an instance.
4778 HTYPE = constants.HTYPE_INSTANCE
4779 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4781 def BuildHooksEnv(self):
4784 This runs on the master, the primary and all the secondaries.
4788 "DISK": self.op.disk,
4789 "AMOUNT": self.op.amount,
4791 env.update(_BuildInstanceHookEnvByObject(self.instance))
4793 self.sstore.GetMasterNode(),
4794 self.instance.primary_node,
4798 def CheckPrereq(self):
4799 """Check prerequisites.
4801 This checks that the instance is in the cluster.
4804 instance = self.cfg.GetInstanceInfo(
4805 self.cfg.ExpandInstanceName(self.op.instance_name))
4806 if instance is None:
4807 raise errors.OpPrereqError("Instance '%s' not known" %
4808 self.op.instance_name)
4810 if self.op.amount <= 0:
4811 raise errors.OpPrereqError("Invalid grow-by amount: %s" % self.op.amount)
4813 self.instance = instance
4814 self.op.instance_name = instance.name
4816 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4817 raise errors.OpPrereqError("Instance's disk layout does not support"
4820 self.disk = instance.FindDisk(self.op.disk)
4821 if self.disk is None:
4822 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4823 (self.op.disk, instance.name))
4825 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4826 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4827 for node in nodenames:
4828 info = nodeinfo.get(node, None)
4830 raise errors.OpPrereqError("Cannot get current information"
4831 " from node '%s'" % node)
4832 vg_free = info.get('vg_free', None)
4833 if not isinstance(vg_free, int):
4834 raise errors.OpPrereqError("Can't compute free disk space on"
4836 if self.op.amount > info['vg_free']:
4837 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4838 " %d MiB available, %d MiB required" %
4839 (node, info['vg_free'], self.op.amount))
4840 is_primary = (node == instance.primary_node)
4841 if not _CheckDiskConsistency(self.cfg, self.disk, node, is_primary):
4842 raise errors.OpPrereqError("Disk %s is degraded or not fully"
4843 " synchronized on node %s,"
4844 " aborting grow." % (self.op.disk, node))
4846 def Exec(self, feedback_fn):
4847 """Execute disk grow.
4850 instance = self.instance
4852 for node in (instance.secondary_nodes + (instance.primary_node,)):
4853 self.cfg.SetDiskID(disk, node)
4854 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4855 if not result or not isinstance(result, tuple) or len(result) != 2:
4856 raise errors.OpExecError("grow request failed to node %s" % node)
4858 raise errors.OpExecError("grow request failed to node %s: %s" %
4860 disk.RecordGrow(self.op.amount)
4861 self.cfg.Update(instance)
4862 if self.op.wait_for_sync:
4863 disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4865 logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4866 " Please check the instance.")
4869 class LUQueryInstanceData(NoHooksLU):
4870 """Query runtime instance data.
4873 _OP_REQP = ["instances", "static"]
4875 def CheckPrereq(self):
4876 """Check prerequisites.
4878 This only checks the optional instance list against the existing names.
4881 if not isinstance(self.op.instances, list):
4882 raise errors.OpPrereqError("Invalid argument type 'instances'")
4883 if self.op.instances:
4884 self.wanted_instances = []
4885 names = self.op.instances
4887 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4888 if instance is None:
4889 raise errors.OpPrereqError("No such instance name '%s'" % name)
4890 self.wanted_instances.append(instance)
4892 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4893 in self.cfg.GetInstanceList()]
4897 def _ComputeDiskStatus(self, instance, snode, dev):
4898 """Compute block device status.
4901 static = self.op.static
4903 self.cfg.SetDiskID(dev, instance.primary_node)
4904 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4908 if dev.dev_type in constants.LDS_DRBD:
4909 # we change the snode then (otherwise we use the one passed in)
4910 if dev.logical_id[0] == instance.primary_node:
4911 snode = dev.logical_id[1]
4913 snode = dev.logical_id[0]
4915 if snode and not static:
4916 self.cfg.SetDiskID(dev, snode)
4917 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4922 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4923 for child in dev.children]
4928 "iv_name": dev.iv_name,
4929 "dev_type": dev.dev_type,
4930 "logical_id": dev.logical_id,
4931 "physical_id": dev.physical_id,
4932 "pstatus": dev_pstatus,
4933 "sstatus": dev_sstatus,
4934 "children": dev_children,
4939 def Exec(self, feedback_fn):
4940 """Gather and return data"""
4942 for instance in self.wanted_instances:
4943 if not self.op.static:
4944 remote_info = rpc.call_instance_info(instance.primary_node,
4946 if remote_info and "state" in remote_info:
4949 remote_state = "down"
4952 if instance.status == "down":
4953 config_state = "down"
4957 disks = [self._ComputeDiskStatus(instance, None, device)
4958 for device in instance.disks]
4961 "name": instance.name,
4962 "config_state": config_state,
4963 "run_state": remote_state,
4964 "pnode": instance.primary_node,
4965 "snodes": instance.secondary_nodes,
4967 "memory": instance.memory,
4968 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4970 "vcpus": instance.vcpus,
4971 "auto_balance": instance.auto_balance,
4974 htkind = self.sstore.GetHypervisorType()
4975 if htkind == constants.HT_XEN_PVM30:
4976 idict["kernel_path"] = instance.kernel_path
4977 idict["initrd_path"] = instance.initrd_path
4979 if htkind == constants.HT_XEN_HVM31:
4980 idict["hvm_boot_order"] = instance.hvm_boot_order
4981 idict["hvm_acpi"] = instance.hvm_acpi
4982 idict["hvm_pae"] = instance.hvm_pae
4983 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4984 idict["hvm_nic_type"] = instance.hvm_nic_type
4985 idict["hvm_disk_type"] = instance.hvm_disk_type
4987 if htkind in constants.HTS_REQ_PORT:
4988 if instance.vnc_bind_address is None:
4989 vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4991 vnc_bind_address = instance.vnc_bind_address
4992 if instance.network_port is None:
4993 vnc_console_port = None
4994 elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4995 vnc_console_port = "%s:%s" % (instance.primary_node,
4996 instance.network_port)
4997 elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4998 vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4999 instance.network_port,
5000 instance.primary_node)
5002 vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
5003 instance.network_port)
5004 idict["vnc_console_port"] = vnc_console_port
5005 idict["vnc_bind_address"] = vnc_bind_address
5006 idict["network_port"] = instance.network_port
5008 result[instance.name] = idict
5013 class LUSetInstanceParms(LogicalUnit):
5014 """Modifies an instances's parameters.
5017 HPATH = "instance-modify"
5018 HTYPE = constants.HTYPE_INSTANCE
5019 _OP_REQP = ["instance_name"]
5021 def BuildHooksEnv(self):
5024 This runs on the master, primary and secondaries.
5029 args['memory'] = self.mem
5031 args['vcpus'] = self.vcpus
5032 if self.do_ip or self.do_bridge or self.mac:
5036 ip = self.instance.nics[0].ip
5038 bridge = self.bridge
5040 bridge = self.instance.nics[0].bridge
5044 mac = self.instance.nics[0].mac
5045 args['nics'] = [(ip, bridge, mac)]
5046 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
5047 nl = [self.sstore.GetMasterNode(),
5048 self.instance.primary_node] + list(self.instance.secondary_nodes)
5051 def CheckPrereq(self):
5052 """Check prerequisites.
5054 This only checks the instance list against the existing names.
5057 self.mem = getattr(self.op, "mem", None)
5058 self.vcpus = getattr(self.op, "vcpus", None)
5059 self.ip = getattr(self.op, "ip", None)
5060 self.mac = getattr(self.op, "mac", None)
5061 self.bridge = getattr(self.op, "bridge", None)
5062 self.kernel_path = getattr(self.op, "kernel_path", None)
5063 self.initrd_path = getattr(self.op, "initrd_path", None)
5064 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
5065 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
5066 self.hvm_pae = getattr(self.op, "hvm_pae", None)
5067 self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
5068 self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
5069 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
5070 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
5071 self.force = getattr(self.op, "force", None)
5072 self.auto_balance = getattr(self.op, "auto_balance", None)
5074 self.mem, self.vcpus, self.ip, self.bridge, self.mac,
5075 self.kernel_path, self.initrd_path, self.hvm_boot_order,
5076 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
5077 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type,
5080 if all_parms.count(None) == len(all_parms):
5081 raise errors.OpPrereqError("No changes submitted")
5082 if self.mem is not None:
5084 self.mem = int(self.mem)
5085 except ValueError, err:
5086 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
5087 if self.vcpus is not None:
5089 self.vcpus = int(self.vcpus)
5090 except ValueError, err:
5091 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
5092 if self.ip is not None:
5094 if self.ip.lower() == "none":
5097 if not utils.IsValidIP(self.ip):
5098 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
5101 self.do_bridge = (self.bridge is not None)
5102 if self.mac is not None:
5103 if self.cfg.IsMacInUse(self.mac):
5104 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
5106 if not utils.IsValidMac(self.mac):
5107 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
5109 if self.kernel_path is not None:
5110 self.do_kernel_path = True
5111 if self.kernel_path == constants.VALUE_NONE:
5112 raise errors.OpPrereqError("Can't set instance to no kernel")
5114 if self.kernel_path != constants.VALUE_DEFAULT:
5115 if not os.path.isabs(self.kernel_path):
5116 raise errors.OpPrereqError("The kernel path must be an absolute"
5119 self.do_kernel_path = False
5121 if self.initrd_path is not None:
5122 self.do_initrd_path = True
5123 if self.initrd_path not in (constants.VALUE_NONE,
5124 constants.VALUE_DEFAULT):
5125 if not os.path.isabs(self.initrd_path):
5126 raise errors.OpPrereqError("The initrd path must be an absolute"
5129 self.do_initrd_path = False
5131 # boot order verification
5132 if self.hvm_boot_order is not None:
5133 if self.hvm_boot_order != constants.VALUE_DEFAULT:
5134 if len(self.hvm_boot_order.strip("acdn")) != 0:
5135 raise errors.OpPrereqError("invalid boot order specified,"
5136 " must be one or more of [acdn]"
5139 # hvm_cdrom_image_path verification
5140 if self.op.hvm_cdrom_image_path is not None:
5141 if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
5142 self.op.hvm_cdrom_image_path.lower() == "none"):
5143 raise errors.OpPrereqError("The path to the HVM CDROM image must"
5144 " be an absolute path or None, not %s" %
5145 self.op.hvm_cdrom_image_path)
5146 if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
5147 self.op.hvm_cdrom_image_path.lower() == "none"):
5148 raise errors.OpPrereqError("The HVM CDROM image must either be a"
5149 " regular file or a symlink pointing to"
5150 " an existing regular file, not %s" %
5151 self.op.hvm_cdrom_image_path)
5153 # vnc_bind_address verification
5154 if self.op.vnc_bind_address is not None:
5155 if not utils.IsValidIP(self.op.vnc_bind_address):
5156 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
5157 " like a valid IP address" %
5158 self.op.vnc_bind_address)
5160 # Xen HVM device type checks
5161 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
5162 if self.op.hvm_nic_type is not None:
5163 if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
5164 raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
5165 " HVM hypervisor" % self.op.hvm_nic_type)
5166 if self.op.hvm_disk_type is not None:
5167 if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
5168 raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
5169 " HVM hypervisor" % self.op.hvm_disk_type)
5171 # auto balance setting
5172 if self.auto_balance is not None:
5173 # convert the value to a proper bool value, if it's not
5174 self.auto_balance = bool(self.auto_balance)
5176 instance = self.cfg.GetInstanceInfo(
5177 self.cfg.ExpandInstanceName(self.op.instance_name))
5178 if instance is None:
5179 raise errors.OpPrereqError("No such instance name '%s'" %
5180 self.op.instance_name)
5181 self.op.instance_name = instance.name
5182 self.instance = instance
5184 if self.mem is not None and not self.force:
5185 pnode = self.instance.primary_node
5187 if instance.auto_balance:
5188 nodelist.extend(instance.secondary_nodes)
5189 instance_info = rpc.call_instance_info(pnode, instance.name)
5190 nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
5192 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
5193 # Assume the primary node is unreachable and go ahead
5194 self.warn.append("Can't get info from primary node %s" % pnode)
5197 current_mem = instance_info['memory']
5199 # Assume instance not running
5200 # (there is a slight race condition here, but it's not very probable,
5201 # and we have no other way to check)
5203 miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
5205 raise errors.OpPrereqError("This change will prevent the instance"
5206 " from starting, due to %d MB of memory"
5207 " missing on its primary node" % miss_mem)
5209 if instance.auto_balance:
5210 for node in instance.secondary_nodes:
5211 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
5212 self.warn.append("Can't get info from secondary node %s" % node)
5213 elif self.mem > nodeinfo[node]['memory_free']:
5214 self.warn.append("Not enough memory to failover instance to"
5215 " secondary node %s" % node)
5218 def Exec(self, feedback_fn):
5219 """Modifies an instance.
5221 All parameters take effect only at the next restart of the instance.
5223 # Process here the warnings from CheckPrereq, as we don't have a
5224 # feedback_fn there.
5225 for warn in self.warn:
5226 feedback_fn("WARNING: %s" % warn)
5229 instance = self.instance
5231 instance.memory = self.mem
5232 result.append(("mem", self.mem))
5234 instance.vcpus = self.vcpus
5235 result.append(("vcpus", self.vcpus))
5237 instance.nics[0].ip = self.ip
5238 result.append(("ip", self.ip))
5240 instance.nics[0].bridge = self.bridge
5241 result.append(("bridge", self.bridge))
5243 instance.nics[0].mac = self.mac
5244 result.append(("mac", self.mac))
5245 if self.do_kernel_path:
5246 instance.kernel_path = self.kernel_path
5247 result.append(("kernel_path", self.kernel_path))
5248 if self.do_initrd_path:
5249 instance.initrd_path = self.initrd_path
5250 result.append(("initrd_path", self.initrd_path))
5251 if self.hvm_boot_order:
5252 if self.hvm_boot_order == constants.VALUE_DEFAULT:
5253 instance.hvm_boot_order = None
5255 instance.hvm_boot_order = self.hvm_boot_order
5256 result.append(("hvm_boot_order", self.hvm_boot_order))
5257 if self.hvm_acpi is not None:
5258 instance.hvm_acpi = self.hvm_acpi
5259 result.append(("hvm_acpi", self.hvm_acpi))
5260 if self.hvm_pae is not None:
5261 instance.hvm_pae = self.hvm_pae
5262 result.append(("hvm_pae", self.hvm_pae))
5263 if self.hvm_nic_type is not None:
5264 instance.hvm_nic_type = self.hvm_nic_type
5265 result.append(("hvm_nic_type", self.hvm_nic_type))
5266 if self.hvm_disk_type is not None:
5267 instance.hvm_disk_type = self.hvm_disk_type
5268 result.append(("hvm_disk_type", self.hvm_disk_type))
5269 if self.hvm_cdrom_image_path:
5270 if self.hvm_cdrom_image_path == constants.VALUE_NONE:
5271 instance.hvm_cdrom_image_path = None
5273 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
5274 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
5275 if self.vnc_bind_address:
5276 instance.vnc_bind_address = self.vnc_bind_address
5277 result.append(("vnc_bind_address", self.vnc_bind_address))
5278 if self.auto_balance is not None:
5279 instance.auto_balance = self.auto_balance
5280 result.append(("auto_balance", self.auto_balance))
5282 self.cfg.AddInstance(instance)
5287 class LUQueryExports(NoHooksLU):
5288 """Query the exports list
5293 def CheckPrereq(self):
5294 """Check that the nodelist contains only existing nodes.
5297 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
5299 def Exec(self, feedback_fn):
5300 """Compute the list of all the exported system images.
5303 a dictionary with the structure node->(export-list)
5304 where export-list is a list of the instances exported on
5308 return rpc.call_export_list(self.nodes)
5311 class LUExportInstance(LogicalUnit):
5312 """Export an instance to an image in the cluster.
5315 HPATH = "instance-export"
5316 HTYPE = constants.HTYPE_INSTANCE
5317 _OP_REQP = ["instance_name", "target_node", "shutdown"]
5319 def BuildHooksEnv(self):
5322 This will run on the master, primary node and target node.
5326 "EXPORT_NODE": self.op.target_node,
5327 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5329 env.update(_BuildInstanceHookEnvByObject(self.instance))
5330 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
5331 self.op.target_node]
5334 def CheckPrereq(self):
5335 """Check prerequisites.
5337 This checks that the instance and node names are valid.
5340 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5341 self.instance = self.cfg.GetInstanceInfo(instance_name)
5342 if self.instance is None:
5343 raise errors.OpPrereqError("Instance '%s' not found" %
5344 self.op.instance_name)
5347 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
5348 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
5350 if self.dst_node is None:
5351 raise errors.OpPrereqError("Destination node '%s' is unknown." %
5352 self.op.target_node)
5353 self.op.target_node = self.dst_node.name
5355 def Exec(self, feedback_fn):
5356 """Export an instance to an image in the cluster.
5359 instance = self.instance
5360 dst_node = self.dst_node
5361 src_node = instance.primary_node
5362 if self.op.shutdown:
5363 # shutdown the instance, but not the disks
5364 if not rpc.call_instance_shutdown(src_node, instance):
5365 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5366 (instance.name, src_node))
5368 vgname = self.cfg.GetVGName()
5373 for disk in instance.disks:
5374 if disk.iv_name == "sda":
5375 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5376 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
5378 if not new_dev_name:
5379 logger.Error("could not snapshot block device %s on node %s" %
5380 (disk.logical_id[1], src_node))
5382 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5383 logical_id=(vgname, new_dev_name),
5384 physical_id=(vgname, new_dev_name),
5385 iv_name=disk.iv_name)
5386 snap_disks.append(new_dev)
5389 if self.op.shutdown and instance.status == "up":
5390 if not rpc.call_instance_start(src_node, instance, None):
5391 _ShutdownInstanceDisks(instance, self.cfg)
5392 raise errors.OpExecError("Could not start instance")
5394 # TODO: check for size
5396 for dev in snap_disks:
5397 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
5399 logger.Error("could not export block device %s from node"
5401 (dev.logical_id[1], src_node, dst_node.name))
5402 if not rpc.call_blockdev_remove(src_node, dev):
5403 logger.Error("could not remove snapshot block device %s from"
5404 " node %s" % (dev.logical_id[1], src_node))
5406 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5407 logger.Error("could not finalize export for instance %s on node %s" %
5408 (instance.name, dst_node.name))
5410 nodelist = self.cfg.GetNodeList()
5411 nodelist.remove(dst_node.name)
5413 # on one-node clusters nodelist will be empty after the removal
5414 # if we proceed the backup would be removed because OpQueryExports
5415 # substitutes an empty list with the full cluster node list.
5417 op = opcodes.OpQueryExports(nodes=nodelist)
5418 exportlist = self.proc.ChainOpCode(op)
5419 for node in exportlist:
5420 if instance.name in exportlist[node]:
5421 if not rpc.call_export_remove(node, instance.name):
5422 logger.Error("could not remove older export for instance %s"
5423 " on node %s" % (instance.name, node))
5426 class LURemoveExport(NoHooksLU):
5427 """Remove exports related to the named instance.
5430 _OP_REQP = ["instance_name"]
5432 def CheckPrereq(self):
5433 """Check prerequisites.
5437 def Exec(self, feedback_fn):
5438 """Remove any export.
5441 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5442 # If the instance was not found we'll try with the name that was passed in.
5443 # This will only work if it was an FQDN, though.
5445 if not instance_name:
5447 instance_name = self.op.instance_name
5449 op = opcodes.OpQueryExports(nodes=[])
5450 exportlist = self.proc.ChainOpCode(op)
5452 for node in exportlist:
5453 if instance_name in exportlist[node]:
5455 if not rpc.call_export_remove(node, instance_name):
5456 logger.Error("could not remove export for instance %s"
5457 " on node %s" % (instance_name, node))
5459 if fqdn_warn and not found:
5460 feedback_fn("Export not found. If trying to remove an export belonging"
5461 " to a deleted instance please use its Fully Qualified"
5465 class TagsLU(NoHooksLU):
5468 This is an abstract class which is the parent of all the other tags LUs.
5471 def CheckPrereq(self):
5472 """Check prerequisites.
5475 if self.op.kind == constants.TAG_CLUSTER:
5476 self.target = self.cfg.GetClusterInfo()
5477 elif self.op.kind == constants.TAG_NODE:
5478 name = self.cfg.ExpandNodeName(self.op.name)
5480 raise errors.OpPrereqError("Invalid node name (%s)" %
5483 self.target = self.cfg.GetNodeInfo(name)
5484 elif self.op.kind == constants.TAG_INSTANCE:
5485 name = self.cfg.ExpandInstanceName(self.op.name)
5487 raise errors.OpPrereqError("Invalid instance name (%s)" %
5490 self.target = self.cfg.GetInstanceInfo(name)
5492 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5496 class LUGetTags(TagsLU):
5497 """Returns the tags of a given object.
5500 _OP_REQP = ["kind", "name"]
5502 def Exec(self, feedback_fn):
5503 """Returns the tag list.
5506 return self.target.GetTags()
5509 class LUSearchTags(NoHooksLU):
5510 """Searches the tags for a given pattern.
5513 _OP_REQP = ["pattern"]
5515 def CheckPrereq(self):
5516 """Check prerequisites.
5518 This checks the pattern passed for validity by compiling it.
5522 self.re = re.compile(self.op.pattern)
5523 except re.error, err:
5524 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5525 (self.op.pattern, err))
5527 def Exec(self, feedback_fn):
5528 """Returns the tag list.
5532 tgts = [("/cluster", cfg.GetClusterInfo())]
5533 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
5534 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5535 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
5536 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5538 for path, target in tgts:
5539 for tag in target.GetTags():
5540 if self.re.search(tag):
5541 results.append((path, tag))
5545 class LUAddTags(TagsLU):
5546 """Sets a tag on a given object.
5549 _OP_REQP = ["kind", "name", "tags"]
5551 def CheckPrereq(self):
5552 """Check prerequisites.
5554 This checks the type and length of the tag name and value.
5557 TagsLU.CheckPrereq(self)
5558 for tag in self.op.tags:
5559 objects.TaggableObject.ValidateTag(tag)
5561 def Exec(self, feedback_fn):
5566 for tag in self.op.tags:
5567 self.target.AddTag(tag)
5568 except errors.TagError, err:
5569 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5571 self.cfg.Update(self.target)
5572 except errors.ConfigurationError:
5573 raise errors.OpRetryError("There has been a modification to the"
5574 " config file and the operation has been"
5575 " aborted. Please retry.")
5578 class LUDelTags(TagsLU):
5579 """Delete a list of tags from a given object.
5582 _OP_REQP = ["kind", "name", "tags"]
5584 def CheckPrereq(self):
5585 """Check prerequisites.
5587 This checks that we have the given tag.
5590 TagsLU.CheckPrereq(self)
5591 for tag in self.op.tags:
5592 objects.TaggableObject.ValidateTag(tag, removal=True)
5593 del_tags = frozenset(self.op.tags)
5594 cur_tags = self.target.GetTags()
5595 if not del_tags <= cur_tags:
5596 diff_tags = del_tags - cur_tags
5597 diff_names = ["'%s'" % tag for tag in diff_tags]
5599 raise errors.OpPrereqError("Tag(s) %s not found" %
5600 (",".join(diff_names)))
5602 def Exec(self, feedback_fn):
5603 """Remove the tag from the object.
5606 for tag in self.op.tags:
5607 self.target.RemoveTag(tag)
5609 self.cfg.Update(self.target)
5610 except errors.ConfigurationError:
5611 raise errors.OpRetryError("There has been a modification to the"
5612 " config file and the operation has been"
5613 " aborted. Please retry.")
5615 class LUTestDelay(NoHooksLU):
5616 """Sleep for a specified amount of time.
5618 This LU sleeps on the master and/or nodes for a specified amoutn of
5622 _OP_REQP = ["duration", "on_master", "on_nodes"]
5624 def CheckPrereq(self):
5625 """Check prerequisites.
5627 This checks that we have a good list of nodes and/or the duration
5632 if self.op.on_nodes:
5633 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5635 def Exec(self, feedback_fn):
5636 """Do the actual sleep.
5639 if self.op.on_master:
5640 if not utils.TestDelay(self.op.duration):
5641 raise errors.OpExecError("Error during master delay test")
5642 if self.op.on_nodes:
5643 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5645 raise errors.OpExecError("Complete failure from rpc call")
5646 for node, node_result in result.items():
5648 raise errors.OpExecError("Failure during rpc call to node %s,"
5649 " result: %s" % (node, node_result))
5652 class IAllocator(object):
5653 """IAllocator framework.
5655 An IAllocator instance has three sets of attributes:
5656 - cfg/sstore that are needed to query the cluster
5657 - input data (all members of the _KEYS class attribute are required)
5658 - four buffer attributes (in|out_data|text), that represent the
5659 input (to the external script) in text and data structure format,
5660 and the output from it, again in two formats
5661 - the result variables from the script (success, info, nodes) for
5666 "mem_size", "disks", "disk_template",
5667 "os", "tags", "nics", "vcpus",
5673 def __init__(self, cfg, sstore, mode, name, **kwargs):
5675 self.sstore = sstore
5676 # init buffer variables
5677 self.in_text = self.out_text = self.in_data = self.out_data = None
5678 # init all input fields so that pylint is happy
5681 self.mem_size = self.disks = self.disk_template = None
5682 self.os = self.tags = self.nics = self.vcpus = None
5683 self.relocate_from = None
5685 self.required_nodes = None
5686 # init result fields
5687 self.success = self.info = self.nodes = None
5688 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5689 keyset = self._ALLO_KEYS
5690 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5691 keyset = self._RELO_KEYS
5693 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5694 " IAllocator" % self.mode)
5696 if key not in keyset:
5697 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5698 " IAllocator" % key)
5699 setattr(self, key, kwargs[key])
5701 if key not in kwargs:
5702 raise errors.ProgrammerError("Missing input parameter '%s' to"
5703 " IAllocator" % key)
5704 self._BuildInputData()
5706 def _ComputeClusterData(self):
5707 """Compute the generic allocator input data.
5709 This is the data that is independent of the actual operation.
5716 "cluster_name": self.sstore.GetClusterName(),
5717 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5718 "hypervisor_type": self.sstore.GetHypervisorType(),
5719 # we don't have job IDs
5722 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5726 node_list = cfg.GetNodeList()
5727 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5728 for nname in node_list:
5729 ninfo = cfg.GetNodeInfo(nname)
5730 if nname not in node_data or not isinstance(node_data[nname], dict):
5731 raise errors.OpExecError("Can't get data for node %s" % nname)
5732 remote_info = node_data[nname]
5733 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5734 'vg_size', 'vg_free', 'cpu_total']:
5735 if attr not in remote_info:
5736 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5739 remote_info[attr] = int(remote_info[attr])
5740 except ValueError, err:
5741 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5742 " %s" % (nname, attr, str(err)))
5743 # compute memory used by primary instances
5744 i_p_mem = i_p_up_mem = 0
5745 for iinfo in i_list:
5746 if iinfo.primary_node == nname:
5747 i_p_mem += iinfo.memory
5748 if iinfo.status == "up":
5749 i_p_up_mem += iinfo.memory
5751 # compute memory used by instances
5753 "tags": list(ninfo.GetTags()),
5754 "total_memory": remote_info['memory_total'],
5755 "reserved_memory": remote_info['memory_dom0'],
5756 "free_memory": remote_info['memory_free'],
5757 "i_pri_memory": i_p_mem,
5758 "i_pri_up_memory": i_p_up_mem,
5759 "total_disk": remote_info['vg_size'],
5760 "free_disk": remote_info['vg_free'],
5761 "primary_ip": ninfo.primary_ip,
5762 "secondary_ip": ninfo.secondary_ip,
5763 "total_cpus": remote_info['cpu_total'],
5765 node_results[nname] = pnr
5766 data["nodes"] = node_results
5770 for iinfo in i_list:
5771 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5772 for n in iinfo.nics]
5774 "tags": list(iinfo.GetTags()),
5775 "should_run": iinfo.status == "up",
5776 "vcpus": iinfo.vcpus,
5777 "memory": iinfo.memory,
5779 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5781 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5782 "disk_template": iinfo.disk_template,
5784 instance_data[iinfo.name] = pir
5786 data["instances"] = instance_data
5790 def _AddNewInstance(self):
5791 """Add new instance data to allocator structure.
5793 This in combination with _AllocatorGetClusterData will create the
5794 correct structure needed as input for the allocator.
5796 The checks for the completeness of the opcode must have already been
5801 if len(self.disks) != 2:
5802 raise errors.OpExecError("Only two-disk configurations supported")
5804 disk_space = _ComputeDiskSize(self.disk_template,
5805 self.disks[0]["size"], self.disks[1]["size"])
5807 if self.disk_template in constants.DTS_NET_MIRROR:
5808 self.required_nodes = 2
5810 self.required_nodes = 1
5814 "disk_template": self.disk_template,
5817 "vcpus": self.vcpus,
5818 "memory": self.mem_size,
5819 "disks": self.disks,
5820 "disk_space_total": disk_space,
5822 "required_nodes": self.required_nodes,
5824 data["request"] = request
5826 def _AddRelocateInstance(self):
5827 """Add relocate instance data to allocator structure.
5829 This in combination with _IAllocatorGetClusterData will create the
5830 correct structure needed as input for the allocator.
5832 The checks for the completeness of the opcode must have already been
5836 instance = self.cfg.GetInstanceInfo(self.name)
5837 if instance is None:
5838 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5839 " IAllocator" % self.name)
5841 if instance.disk_template not in constants.DTS_NET_MIRROR:
5842 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5844 if len(instance.secondary_nodes) != 1:
5845 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5847 self.required_nodes = 1
5849 disk_space = _ComputeDiskSize(instance.disk_template,
5850 instance.disks[0].size,
5851 instance.disks[1].size)
5856 "disk_space_total": disk_space,
5857 "required_nodes": self.required_nodes,
5858 "relocate_from": self.relocate_from,
5860 self.in_data["request"] = request
5862 def _BuildInputData(self):
5863 """Build input data structures.
5866 self._ComputeClusterData()
5868 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5869 self._AddNewInstance()
5871 self._AddRelocateInstance()
5873 self.in_text = serializer.Dump(self.in_data)
5875 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5876 """Run an instance allocator and return the results.
5881 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5883 if not isinstance(result, tuple) or len(result) != 4:
5884 raise errors.OpExecError("Invalid result from master iallocator runner")
5886 rcode, stdout, stderr, fail = result
5888 if rcode == constants.IARUN_NOTFOUND:
5889 raise errors.OpExecError("Can't find allocator '%s'" % name)
5890 elif rcode == constants.IARUN_FAILURE:
5891 raise errors.OpExecError("Instance allocator call failed: %s,"
5893 (fail, stdout+stderr))
5894 self.out_text = stdout
5896 self._ValidateResult()
5898 def _ValidateResult(self):
5899 """Process the allocator results.
5901 This will process and if successful save the result in
5902 self.out_data and the other parameters.
5906 rdict = serializer.Load(self.out_text)
5907 except Exception, err:
5908 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5910 if not isinstance(rdict, dict):
5911 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5913 for key in "success", "info", "nodes":
5914 if key not in rdict:
5915 raise errors.OpExecError("Can't parse iallocator results:"
5916 " missing key '%s'" % key)
5917 setattr(self, key, rdict[key])
5919 if not isinstance(rdict["nodes"], list):
5920 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5922 self.out_data = rdict
5925 class LUTestAllocator(NoHooksLU):
5926 """Run allocator tests.
5928 This LU runs the allocator tests
5931 _OP_REQP = ["direction", "mode", "name"]
5933 def CheckPrereq(self):
5934 """Check prerequisites.
5936 This checks the opcode parameters depending on the director and mode test.
5939 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5940 for attr in ["name", "mem_size", "disks", "disk_template",
5941 "os", "tags", "nics", "vcpus"]:
5942 if not hasattr(self.op, attr):
5943 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5945 iname = self.cfg.ExpandInstanceName(self.op.name)
5946 if iname is not None:
5947 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5949 if not isinstance(self.op.nics, list):
5950 raise errors.OpPrereqError("Invalid parameter 'nics'")
5951 for row in self.op.nics:
5952 if (not isinstance(row, dict) or
5955 "bridge" not in row):
5956 raise errors.OpPrereqError("Invalid contents of the"
5957 " 'nics' parameter")
5958 if not isinstance(self.op.disks, list):
5959 raise errors.OpPrereqError("Invalid parameter 'disks'")
5960 if len(self.op.disks) != 2:
5961 raise errors.OpPrereqError("Only two-disk configurations supported")
5962 for row in self.op.disks:
5963 if (not isinstance(row, dict) or
5964 "size" not in row or
5965 not isinstance(row["size"], int) or
5966 "mode" not in row or
5967 row["mode"] not in ['r', 'w']):
5968 raise errors.OpPrereqError("Invalid contents of the"
5969 " 'disks' parameter")
5970 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5971 if not hasattr(self.op, "name"):
5972 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5973 fname = self.cfg.ExpandInstanceName(self.op.name)
5975 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5977 self.op.name = fname
5978 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5980 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5983 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5984 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5985 raise errors.OpPrereqError("Missing allocator name")
5986 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5987 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5990 def Exec(self, feedback_fn):
5991 """Run the allocator test.
5994 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5995 ial = IAllocator(self.cfg, self.sstore,
5998 mem_size=self.op.mem_size,
5999 disks=self.op.disks,
6000 disk_template=self.op.disk_template,
6004 vcpus=self.op.vcpus,
6007 ial = IAllocator(self.cfg, self.sstore,
6010 relocate_from=list(self.relocate_from),
6013 if self.op.direction == constants.IALLOCATOR_DIR_IN:
6014 result = ial.in_text
6016 ial.Run(self.op.allocator, validate=False)
6017 result = ial.out_text