4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
48 # Check whether the simplejson module supports indentation
51 simplejson.dumps(1, indent=_JSON_INDENT)
56 class LogicalUnit(object):
57 """Logical Unit base class.
59 Subclasses must follow these rules:
60 - implement CheckPrereq which also fills in the opcode instance
61 with all the fields (even if as None)
63 - implement BuildHooksEnv
64 - redefine HPATH and HTYPE
65 - optionally redefine their run requirements (REQ_CLUSTER,
66 REQ_MASTER); note that all commands require root permissions
75 def __init__(self, processor, op, cfg, sstore):
76 """Constructor for LogicalUnit.
78 This needs to be overriden in derived classes in order to check op
86 for attr_name in self._OP_REQP:
87 attr_val = getattr(op, attr_name, None)
89 raise errors.OpPrereqError("Required parameter '%s' missing" %
92 if not cfg.IsCluster():
93 raise errors.OpPrereqError("Cluster not initialized yet,"
94 " use 'gnt-cluster init' first.")
96 master = sstore.GetMasterNode()
97 if master != utils.HostInfo().name:
98 raise errors.OpPrereqError("Commands must be run on the master"
101 def CheckPrereq(self):
102 """Check prerequisites for this LU.
104 This method should check that the prerequisites for the execution
105 of this LU are fulfilled. It can do internode communication, but
106 it should be idempotent - no cluster or system changes are
109 The method should raise errors.OpPrereqError in case something is
110 not fulfilled. Its return value is ignored.
112 This method should also update all the parameters of the opcode to
113 their canonical form; e.g. a short node name must be fully
114 expanded after this method has successfully completed (so that
115 hooks, logging, etc. work correctly).
118 raise NotImplementedError
120 def Exec(self, feedback_fn):
123 This method should implement the actual work. It should raise
124 errors.OpExecError for failures that are somewhat dealt with in
128 raise NotImplementedError
130 def BuildHooksEnv(self):
131 """Build hooks environment for this LU.
133 This method should return a three-node tuple consisting of: a dict
134 containing the environment that will be used for running the
135 specific hook for this LU, a list of node names on which the hook
136 should run before the execution, and a list of node names on which
137 the hook should run after the execution.
139 The keys of the dict must not have 'GANETI_' prefixed as this will
140 be handled in the hooks runner. Also note additional keys will be
141 added by the hooks runner. If the LU doesn't define any
142 environment, an empty dict (and not None) should be returned.
144 As for the node lists, the master should not be included in the
145 them, as it will be added by the hooks runner in case this LU
146 requires a cluster to run on (otherwise we don't have a node
147 list). No nodes should be returned as an empty list (and not
150 Note that if the HPATH for a LU class is None, this function will
154 raise NotImplementedError
157 class NoHooksLU(LogicalUnit):
158 """Simple LU which runs no hooks.
160 This LU is intended as a parent for other LogicalUnits which will
161 run no hooks, in order to reduce duplicate code.
167 def BuildHooksEnv(self):
170 This is a no-op, since we don't run hooks.
176 def _AddHostToEtcHosts(hostname):
177 """Wrapper around utils.SetEtcHostsEntry.
180 hi = utils.HostInfo(name=hostname)
181 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
184 def _RemoveHostFromEtcHosts(hostname):
185 """Wrapper around utils.RemoveEtcHostsEntry.
188 hi = utils.HostInfo(name=hostname)
189 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
190 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
193 def _GetWantedNodes(lu, nodes):
194 """Returns list of checked and expanded node names.
197 nodes: List of nodes (strings) or None for all
200 if not isinstance(nodes, list):
201 raise errors.OpPrereqError("Invalid argument type 'nodes'")
207 node = lu.cfg.ExpandNodeName(name)
209 raise errors.OpPrereqError("No such node name '%s'" % name)
213 wanted = lu.cfg.GetNodeList()
214 return utils.NiceSort(wanted)
217 def _GetWantedInstances(lu, instances):
218 """Returns list of checked and expanded instance names.
221 instances: List of instances (strings) or None for all
224 if not isinstance(instances, list):
225 raise errors.OpPrereqError("Invalid argument type 'instances'")
230 for name in instances:
231 instance = lu.cfg.ExpandInstanceName(name)
233 raise errors.OpPrereqError("No such instance name '%s'" % name)
234 wanted.append(instance)
237 wanted = lu.cfg.GetInstanceList()
238 return utils.NiceSort(wanted)
241 def _CheckOutputFields(static, dynamic, selected):
242 """Checks whether all selected fields are valid.
245 static: Static fields
246 dynamic: Dynamic fields
249 static_fields = frozenset(static)
250 dynamic_fields = frozenset(dynamic)
252 all_fields = static_fields | dynamic_fields
254 if not all_fields.issuperset(selected):
255 raise errors.OpPrereqError("Unknown output fields selected: %s"
256 % ",".join(frozenset(selected).
257 difference(all_fields)))
260 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
261 memory, vcpus, nics):
262 """Builds instance related env variables for hooks from single variables.
265 secondary_nodes: List of secondary nodes as strings
269 "INSTANCE_NAME": name,
270 "INSTANCE_PRIMARY": primary_node,
271 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
272 "INSTANCE_OS_TYPE": os_type,
273 "INSTANCE_STATUS": status,
274 "INSTANCE_MEMORY": memory,
275 "INSTANCE_VCPUS": vcpus,
279 nic_count = len(nics)
280 for idx, (ip, bridge, mac) in enumerate(nics):
283 env["INSTANCE_NIC%d_IP" % idx] = ip
284 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
285 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289 env["INSTANCE_NIC_COUNT"] = nic_count
294 def _BuildInstanceHookEnvByObject(instance, override=None):
295 """Builds instance related env variables for hooks from an object.
298 instance: objects.Instance object of instance
299 override: dict of values to override
302 'name': instance.name,
303 'primary_node': instance.primary_node,
304 'secondary_nodes': instance.secondary_nodes,
305 'os_type': instance.os,
306 'status': instance.os,
307 'memory': instance.memory,
308 'vcpus': instance.vcpus,
309 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
312 args.update(override)
313 return _BuildInstanceHookEnv(**args)
316 def _UpdateKnownHosts(fullnode, ip, pubkey):
317 """Ensure a node has a correct known_hosts entry.
320 fullnode - Fully qualified domain name of host. (str)
321 ip - IPv4 address of host (str)
322 pubkey - the public key of the cluster
325 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
328 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
337 logger.Debug('read %s' % (repr(rawline),))
339 parts = rawline.rstrip('\r\n').split()
341 # Ignore unwanted lines
342 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
343 fields = parts[0].split(',')
348 for spec in [ ip, fullnode ]:
349 if spec not in fields:
354 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
355 if haveall and key == pubkey:
357 save_lines.append(rawline)
358 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
361 if havesome and (not haveall or key != pubkey):
363 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
366 save_lines.append(rawline)
369 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
370 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
373 save_lines = save_lines + add_lines
375 # Write a new file and replace old.
376 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
378 newfile = os.fdopen(fd, 'w')
380 newfile.write(''.join(save_lines))
383 logger.Debug("Wrote new known_hosts.")
384 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
387 # Simply appending a new line will do the trick.
389 for add in add_lines:
395 def _HasValidVG(vglist, vgname):
396 """Checks if the volume group list is valid.
398 A non-None return value means there's an error, and the return value
399 is the error message.
402 vgsize = vglist.get(vgname, None)
404 return "volume group '%s' missing" % vgname
406 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
411 def _InitSSHSetup(node):
412 """Setup the SSH configuration for the cluster.
415 This generates a dsa keypair for root, adds the pub key to the
416 permitted hosts and adds the hostkey to its own known hosts.
419 node: the name of this host as a fqdn
422 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
424 for name in priv_key, pub_key:
425 if os.path.exists(name):
426 utils.CreateBackup(name)
427 utils.RemoveFile(name)
429 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
433 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
436 f = open(pub_key, 'r')
438 utils.AddAuthorizedKey(auth_keys, f.read(8192))
443 def _InitGanetiServerSetup(ss):
444 """Setup the necessary configuration for the initial node daemon.
446 This creates the nodepass file containing the shared password for
447 the cluster and also generates the SSL certificate.
450 # Create pseudo random password
451 randpass = sha.new(os.urandom(64)).hexdigest()
452 # and write it into sstore
453 ss.SetKey(ss.SS_NODED_PASS, randpass)
455 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
456 "-days", str(365*5), "-nodes", "-x509",
457 "-keyout", constants.SSL_CERT_FILE,
458 "-out", constants.SSL_CERT_FILE, "-batch"])
460 raise errors.OpExecError("could not generate server ssl cert, command"
461 " %s had exitcode %s and error message %s" %
462 (result.cmd, result.exit_code, result.output))
464 os.chmod(constants.SSL_CERT_FILE, 0400)
466 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
469 raise errors.OpExecError("Could not start the node daemon, command %s"
470 " had exitcode %s and error %s" %
471 (result.cmd, result.exit_code, result.output))
474 def _CheckInstanceBridgesExist(instance):
475 """Check that the brigdes needed by an instance exist.
478 # check bridges existance
479 brlist = [nic.bridge for nic in instance.nics]
480 if not rpc.call_bridges_exist(instance.primary_node, brlist):
481 raise errors.OpPrereqError("one or more target bridges %s does not"
482 " exist on destination node '%s'" %
483 (brlist, instance.primary_node))
486 class LUInitCluster(LogicalUnit):
487 """Initialise the cluster.
490 HPATH = "cluster-init"
491 HTYPE = constants.HTYPE_CLUSTER
492 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
493 "def_bridge", "master_netdev"]
496 def BuildHooksEnv(self):
499 Notes: Since we don't require a cluster, we must manually add
500 ourselves in the post-run node list.
503 env = {"OP_TARGET": self.op.cluster_name}
504 return env, [], [self.hostname.name]
506 def CheckPrereq(self):
507 """Verify that the passed name is a valid one.
510 if config.ConfigWriter.IsCluster():
511 raise errors.OpPrereqError("Cluster is already initialised")
513 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
514 if not os.path.exists(constants.VNC_PASSWORD_FILE):
515 raise errors.OpPrereqError("Please prepare the cluster VNC"
517 constants.VNC_PASSWORD_FILE)
519 self.hostname = hostname = utils.HostInfo()
521 if hostname.ip.startswith("127."):
522 raise errors.OpPrereqError("This host's IP resolves to the private"
523 " range (%s). Please fix DNS or %s." %
524 (hostname.ip, constants.ETC_HOSTS))
526 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
527 source=constants.LOCALHOST_IP_ADDRESS):
528 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
529 " to %s,\nbut this ip address does not"
530 " belong to this host."
531 " Aborting." % hostname.ip)
533 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
535 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
537 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
539 secondary_ip = getattr(self.op, "secondary_ip", None)
540 if secondary_ip and not utils.IsValidIP(secondary_ip):
541 raise errors.OpPrereqError("Invalid secondary ip given")
543 secondary_ip != hostname.ip and
544 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
545 source=constants.LOCALHOST_IP_ADDRESS))):
546 raise errors.OpPrereqError("You gave %s as secondary IP,"
547 " but it does not belong to this host." %
549 self.secondary_ip = secondary_ip
551 # checks presence of the volume group given
552 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
555 raise errors.OpPrereqError("Error: %s" % vgstatus)
557 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
559 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
562 if self.op.hypervisor_type not in constants.HYPER_TYPES:
563 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
564 self.op.hypervisor_type)
566 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
568 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
569 (self.op.master_netdev,
570 result.output.strip()))
572 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
573 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
574 raise errors.OpPrereqError("Init.d script '%s' missing or not"
575 " executable." % constants.NODE_INITD_SCRIPT)
577 def Exec(self, feedback_fn):
578 """Initialize the cluster.
581 clustername = self.clustername
582 hostname = self.hostname
584 # set up the simple store
585 self.sstore = ss = ssconf.SimpleStore()
586 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
587 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
588 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
589 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
590 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
592 # set up the inter-node password and certificate
593 _InitGanetiServerSetup(ss)
595 # start the master ip
596 rpc.call_node_start_master(hostname.name)
598 # set up ssh config and /etc/hosts
599 f = open(constants.SSH_HOST_RSA_PUB, 'r')
604 sshkey = sshline.split(" ")[1]
606 _AddHostToEtcHosts(hostname.name)
608 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
610 _InitSSHSetup(hostname.name)
612 # init of cluster config file
613 self.cfg = cfgw = config.ConfigWriter()
614 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
615 sshkey, self.op.mac_prefix,
616 self.op.vg_name, self.op.def_bridge)
619 class LUDestroyCluster(NoHooksLU):
620 """Logical unit for destroying the cluster.
625 def CheckPrereq(self):
626 """Check prerequisites.
628 This checks whether the cluster is empty.
630 Any errors are signalled by raising errors.OpPrereqError.
633 master = self.sstore.GetMasterNode()
635 nodelist = self.cfg.GetNodeList()
636 if len(nodelist) != 1 or nodelist[0] != master:
637 raise errors.OpPrereqError("There are still %d node(s) in"
638 " this cluster." % (len(nodelist) - 1))
639 instancelist = self.cfg.GetInstanceList()
641 raise errors.OpPrereqError("There are still %d instance(s) in"
642 " this cluster." % len(instancelist))
644 def Exec(self, feedback_fn):
645 """Destroys the cluster.
648 master = self.sstore.GetMasterNode()
649 if not rpc.call_node_stop_master(master):
650 raise errors.OpExecError("Could not disable the master role")
651 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
652 utils.CreateBackup(priv_key)
653 utils.CreateBackup(pub_key)
654 rpc.call_node_leave_cluster(master)
657 class LUVerifyCluster(NoHooksLU):
658 """Verifies the cluster status.
663 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664 remote_version, feedback_fn):
665 """Run multiple tests against a node.
668 - compares ganeti version
669 - checks vg existance and size > 20G
670 - checks config file checksum
671 - checks ssh to other nodes
674 node: name of the node to check
675 file_list: required list of files
676 local_cksum: dictionary of local files and their checksums
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 if not remote_version:
682 feedback_fn(" - ERROR: connection to %s failed" % (node))
685 if local_version != remote_version:
686 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
687 (local_version, node, remote_version))
690 # checks vg existance and size > 20G
694 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
698 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
700 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
703 # checks config file checksum
706 if 'filelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned file checksum data")
710 remote_cksum = node_result['filelist']
711 for file_name in file_list:
712 if file_name not in remote_cksum:
714 feedback_fn(" - ERROR: file '%s' missing" % file_name)
715 elif remote_cksum[file_name] != local_cksum[file_name]:
717 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
719 if 'nodelist' not in node_result:
721 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
723 if node_result['nodelist']:
725 for node in node_result['nodelist']:
726 feedback_fn(" - ERROR: communication with node '%s': %s" %
727 (node, node_result['nodelist'][node]))
728 hyp_result = node_result.get('hypervisor', None)
729 if hyp_result is not None:
730 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
733 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
734 node_instance, feedback_fn):
735 """Verify an instance.
737 This function checks to see if the required block devices are
738 available on the instance's node.
743 node_current = instanceconfig.primary_node
746 instanceconfig.MapLVsByNode(node_vol_should)
748 for node in node_vol_should:
749 for volume in node_vol_should[node]:
750 if node not in node_vol_is or volume not in node_vol_is[node]:
751 feedback_fn(" - ERROR: volume %s missing on node %s" %
755 if not instanceconfig.status == 'down':
756 if (node_current not in node_instance or
757 not instance in node_instance[node_current]):
758 feedback_fn(" - ERROR: instance %s not running on node %s" %
759 (instance, node_current))
762 for node in node_instance:
763 if (not node == node_current):
764 if instance in node_instance[node]:
765 feedback_fn(" - ERROR: instance %s should not run on node %s" %
771 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772 """Verify if there are any unknown volumes in the cluster.
774 The .os, .swap and backup volumes are ignored. All other volumes are
780 for node in node_vol_is:
781 for volume in node_vol_is[node]:
782 if node not in node_vol_should or volume not in node_vol_should[node]:
783 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
788 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789 """Verify the list of running instances.
791 This checks what instances are running but unknown to the cluster.
795 for node in node_instance:
796 for runninginstance in node_instance[node]:
797 if runninginstance not in instancelist:
798 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
799 (runninginstance, node))
803 def CheckPrereq(self):
804 """Check prerequisites.
806 This has no prerequisites.
811 def Exec(self, feedback_fn):
812 """Verify integrity of cluster, performing various test on nodes.
816 feedback_fn("* Verifying global settings")
817 for msg in self.cfg.VerifyConfig():
818 feedback_fn(" - ERROR: %s" % msg)
820 vg_name = self.cfg.GetVGName()
821 nodelist = utils.NiceSort(self.cfg.GetNodeList())
822 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
823 i_non_redundant = [] # Non redundant instances
828 # FIXME: verify OS list
830 file_names = list(self.sstore.GetFileList())
831 file_names.append(constants.SSL_CERT_FILE)
832 file_names.append(constants.CLUSTER_CONF_FILE)
833 local_checksums = utils.FingerprintFiles(file_names)
835 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
836 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
837 all_instanceinfo = rpc.call_instance_list(nodelist)
838 all_vglist = rpc.call_vg_list(nodelist)
839 node_verify_param = {
840 'filelist': file_names,
841 'nodelist': nodelist,
844 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
845 all_rversion = rpc.call_version(nodelist)
846 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
848 for node in nodelist:
849 feedback_fn("* Verifying node %s" % node)
850 result = self._VerifyNode(node, file_names, local_checksums,
851 all_vglist[node], all_nvinfo[node],
852 all_rversion[node], feedback_fn)
856 volumeinfo = all_volumeinfo[node]
858 if isinstance(volumeinfo, basestring):
859 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
860 (node, volumeinfo[-400:].encode('string_escape')))
862 node_volume[node] = {}
863 elif not isinstance(volumeinfo, dict):
864 feedback_fn(" - ERROR: connection to %s failed" % (node,))
868 node_volume[node] = volumeinfo
871 nodeinstance = all_instanceinfo[node]
872 if type(nodeinstance) != list:
873 feedback_fn(" - ERROR: connection to %s failed" % (node,))
877 node_instance[node] = nodeinstance
880 nodeinfo = all_ninfo[node]
881 if not isinstance(nodeinfo, dict):
882 feedback_fn(" - ERROR: connection to %s failed" % (node,))
888 "mfree": int(nodeinfo['memory_free']),
889 "dfree": int(nodeinfo['vg_free']),
892 # dictionary holding all instances this node is secondary for,
893 # grouped by their primary node. Each key is a cluster node, and each
894 # value is a list of instances which have the key as primary and the
895 # current node as secondary. this is handy to calculate N+1 memory
896 # availability if you can only failover from a primary to its
898 "sinst-by-pnode": {},
901 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
907 for instance in instancelist:
908 feedback_fn("* Verifying instance %s" % instance)
909 inst_config = self.cfg.GetInstanceInfo(instance)
910 result = self._VerifyInstance(instance, inst_config, node_volume,
911 node_instance, feedback_fn)
914 inst_config.MapLVsByNode(node_vol_should)
916 pnode = inst_config.primary_node
917 if pnode in node_info:
918 node_info[pnode]['pinst'].append(instance)
920 feedback_fn(" - ERROR: instance %s, connection to primary node"
921 " %s failed" % (instance, pnode))
924 # If the instance is non-redundant we cannot survive losing its primary
925 # node, so we are not N+1 compliant. On the other hand we have no disk
926 # templates with more than one secondary so that situation is not well
928 # FIXME: does not support file-backed instances
929 if len(inst_config.secondary_nodes) == 0:
930 i_non_redundant.append(instance)
931 elif len(inst_config.secondary_nodes) > 1:
932 feedback_fn(" - WARNING: multiple secondaries for instance %s"
935 for snode in inst_config.secondary_nodes:
936 if snode in node_info:
937 node_info[snode]['sinst'].append(instance)
938 if pnode not in node_info[snode]['sinst-by-pnode']:
939 node_info[snode]['sinst-by-pnode'][pnode] = []
940 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
942 feedback_fn(" - ERROR: instance %s, connection to secondary node"
943 " %s failed" % (instance, snode))
945 feedback_fn("* Verifying orphan volumes")
946 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
950 feedback_fn("* Verifying remaining instances")
951 result = self._VerifyOrphanInstances(instancelist, node_instance,
958 class LUVerifyDisks(NoHooksLU):
959 """Verifies the cluster disks status.
964 def CheckPrereq(self):
965 """Check prerequisites.
967 This has no prerequisites.
972 def Exec(self, feedback_fn):
973 """Verify integrity of cluster disks.
976 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
978 vg_name = self.cfg.GetVGName()
979 nodes = utils.NiceSort(self.cfg.GetNodeList())
980 instances = [self.cfg.GetInstanceInfo(name)
981 for name in self.cfg.GetInstanceList()]
984 for inst in instances:
986 if (inst.status != "up" or
987 inst.disk_template not in constants.DTS_NET_MIRROR):
989 inst.MapLVsByNode(inst_lvs)
990 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
991 for node, vol_list in inst_lvs.iteritems():
993 nv_dict[(node, vol)] = inst
998 node_lvs = rpc.call_volume_list(nodes, vg_name)
1003 lvs = node_lvs[node]
1005 if isinstance(lvs, basestring):
1006 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1007 res_nlvm[node] = lvs
1008 elif not isinstance(lvs, dict):
1009 logger.Info("connection to node %s failed or invalid data returned" %
1011 res_nodes.append(node)
1014 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1015 inst = nv_dict.pop((node, lv_name), None)
1016 if (not lv_online and inst is not None
1017 and inst.name not in res_instances):
1018 res_instances.append(inst.name)
1020 # any leftover items in nv_dict are missing LVs, let's arrange the
1022 for key, inst in nv_dict.iteritems():
1023 if inst.name not in res_missing:
1024 res_missing[inst.name] = []
1025 res_missing[inst.name].append(key)
1030 class LURenameCluster(LogicalUnit):
1031 """Rename the cluster.
1034 HPATH = "cluster-rename"
1035 HTYPE = constants.HTYPE_CLUSTER
1038 def BuildHooksEnv(self):
1043 "OP_TARGET": self.sstore.GetClusterName(),
1044 "NEW_NAME": self.op.name,
1046 mn = self.sstore.GetMasterNode()
1047 return env, [mn], [mn]
1049 def CheckPrereq(self):
1050 """Verify that the passed name is a valid one.
1053 hostname = utils.HostInfo(self.op.name)
1055 new_name = hostname.name
1056 self.ip = new_ip = hostname.ip
1057 old_name = self.sstore.GetClusterName()
1058 old_ip = self.sstore.GetMasterIP()
1059 if new_name == old_name and new_ip == old_ip:
1060 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1061 " cluster has changed")
1062 if new_ip != old_ip:
1063 result = utils.RunCmd(["fping", "-q", new_ip])
1064 if not result.failed:
1065 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1066 " reachable on the network. Aborting." %
1069 self.op.name = new_name
1071 def Exec(self, feedback_fn):
1072 """Rename the cluster.
1075 clustername = self.op.name
1079 # shutdown the master IP
1080 master = ss.GetMasterNode()
1081 if not rpc.call_node_stop_master(master):
1082 raise errors.OpExecError("Could not disable the master role")
1086 ss.SetKey(ss.SS_MASTER_IP, ip)
1087 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1089 # Distribute updated ss config to all nodes
1090 myself = self.cfg.GetNodeInfo(master)
1091 dist_nodes = self.cfg.GetNodeList()
1092 if myself.name in dist_nodes:
1093 dist_nodes.remove(myself.name)
1095 logger.Debug("Copying updated ssconf data to all nodes")
1096 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1097 fname = ss.KeyToFilename(keyname)
1098 result = rpc.call_upload_file(dist_nodes, fname)
1099 for to_node in dist_nodes:
1100 if not result[to_node]:
1101 logger.Error("copy of file %s to node %s failed" %
1104 if not rpc.call_node_start_master(master):
1105 logger.Error("Could not re-enable the master role on the master,"
1106 " please restart manually.")
1109 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1110 """Sleep and poll for an instance's disk to sync.
1113 if not instance.disks:
1117 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1119 node = instance.primary_node
1121 for dev in instance.disks:
1122 cfgw.SetDiskID(dev, node)
1128 cumul_degraded = False
1129 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1131 proc.LogWarning("Can't get any data from node %s" % node)
1134 raise errors.RemoteError("Can't contact node %s for mirror data,"
1135 " aborting." % node)
1139 for i in range(len(rstats)):
1142 proc.LogWarning("Can't compute data for node %s/%s" %
1143 (node, instance.disks[i].iv_name))
1145 # we ignore the ldisk parameter
1146 perc_done, est_time, is_degraded, _ = mstat
1147 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1148 if perc_done is not None:
1150 if est_time is not None:
1151 rem_time = "%d estimated seconds remaining" % est_time
1154 rem_time = "no time estimate"
1155 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1156 (instance.disks[i].iv_name, perc_done, rem_time))
1163 time.sleep(min(60, max_time))
1169 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1170 return not cumul_degraded
1173 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1174 """Check that mirrors are not degraded.
1176 The ldisk parameter, if True, will change the test from the
1177 is_degraded attribute (which represents overall non-ok status for
1178 the device(s)) to the ldisk (representing the local storage status).
1181 cfgw.SetDiskID(dev, node)
1188 if on_primary or dev.AssembleOnSecondary():
1189 rstats = rpc.call_blockdev_find(node, dev)
1191 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1194 result = result and (not rstats[idx])
1196 for child in dev.children:
1197 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1202 class LUDiagnoseOS(NoHooksLU):
1203 """Logical unit for OS diagnose/query.
1208 def CheckPrereq(self):
1209 """Check prerequisites.
1211 This always succeeds, since this is a pure query LU.
1216 def Exec(self, feedback_fn):
1217 """Compute the list of OSes.
1220 node_list = self.cfg.GetNodeList()
1221 node_data = rpc.call_os_diagnose(node_list)
1222 if node_data == False:
1223 raise errors.OpExecError("Can't gather the list of OSes")
1227 class LURemoveNode(LogicalUnit):
1228 """Logical unit for removing a node.
1231 HPATH = "node-remove"
1232 HTYPE = constants.HTYPE_NODE
1233 _OP_REQP = ["node_name"]
1235 def BuildHooksEnv(self):
1238 This doesn't run on the target node in the pre phase as a failed
1239 node would not allows itself to run.
1243 "OP_TARGET": self.op.node_name,
1244 "NODE_NAME": self.op.node_name,
1246 all_nodes = self.cfg.GetNodeList()
1247 all_nodes.remove(self.op.node_name)
1248 return env, all_nodes, all_nodes
1250 def CheckPrereq(self):
1251 """Check prerequisites.
1254 - the node exists in the configuration
1255 - it does not have primary or secondary instances
1256 - it's not the master
1258 Any errors are signalled by raising errors.OpPrereqError.
1261 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1263 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1265 instance_list = self.cfg.GetInstanceList()
1267 masternode = self.sstore.GetMasterNode()
1268 if node.name == masternode:
1269 raise errors.OpPrereqError("Node is the master node,"
1270 " you need to failover first.")
1272 for instance_name in instance_list:
1273 instance = self.cfg.GetInstanceInfo(instance_name)
1274 if node.name == instance.primary_node:
1275 raise errors.OpPrereqError("Instance %s still running on the node,"
1276 " please remove first." % instance_name)
1277 if node.name in instance.secondary_nodes:
1278 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1279 " please remove first." % instance_name)
1280 self.op.node_name = node.name
1283 def Exec(self, feedback_fn):
1284 """Removes the node from the cluster.
1288 logger.Info("stopping the node daemon and removing configs from node %s" %
1291 rpc.call_node_leave_cluster(node.name)
1293 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1295 logger.Info("Removing node %s from config" % node.name)
1297 self.cfg.RemoveNode(node.name)
1299 _RemoveHostFromEtcHosts(node.name)
1302 class LUQueryNodes(NoHooksLU):
1303 """Logical unit for querying nodes.
1306 _OP_REQP = ["output_fields", "names"]
1308 def CheckPrereq(self):
1309 """Check prerequisites.
1311 This checks that the fields required are valid output fields.
1314 self.dynamic_fields = frozenset(["dtotal", "dfree",
1315 "mtotal", "mnode", "mfree",
1318 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1319 "pinst_list", "sinst_list",
1321 dynamic=self.dynamic_fields,
1322 selected=self.op.output_fields)
1324 self.wanted = _GetWantedNodes(self, self.op.names)
1326 def Exec(self, feedback_fn):
1327 """Computes the list of nodes and their attributes.
1330 nodenames = self.wanted
1331 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1333 # begin data gathering
1335 if self.dynamic_fields.intersection(self.op.output_fields):
1337 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1338 for name in nodenames:
1339 nodeinfo = node_data.get(name, None)
1342 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1343 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1344 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1345 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1346 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1347 "bootid": nodeinfo['bootid'],
1350 live_data[name] = {}
1352 live_data = dict.fromkeys(nodenames, {})
1354 node_to_primary = dict([(name, set()) for name in nodenames])
1355 node_to_secondary = dict([(name, set()) for name in nodenames])
1357 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1358 "sinst_cnt", "sinst_list"))
1359 if inst_fields & frozenset(self.op.output_fields):
1360 instancelist = self.cfg.GetInstanceList()
1362 for instance_name in instancelist:
1363 inst = self.cfg.GetInstanceInfo(instance_name)
1364 if inst.primary_node in node_to_primary:
1365 node_to_primary[inst.primary_node].add(inst.name)
1366 for secnode in inst.secondary_nodes:
1367 if secnode in node_to_secondary:
1368 node_to_secondary[secnode].add(inst.name)
1370 # end data gathering
1373 for node in nodelist:
1375 for field in self.op.output_fields:
1378 elif field == "pinst_list":
1379 val = list(node_to_primary[node.name])
1380 elif field == "sinst_list":
1381 val = list(node_to_secondary[node.name])
1382 elif field == "pinst_cnt":
1383 val = len(node_to_primary[node.name])
1384 elif field == "sinst_cnt":
1385 val = len(node_to_secondary[node.name])
1386 elif field == "pip":
1387 val = node.primary_ip
1388 elif field == "sip":
1389 val = node.secondary_ip
1390 elif field in self.dynamic_fields:
1391 val = live_data[node.name].get(field, None)
1393 raise errors.ParameterError(field)
1394 node_output.append(val)
1395 output.append(node_output)
1400 class LUQueryNodeVolumes(NoHooksLU):
1401 """Logical unit for getting volumes on node(s).
1404 _OP_REQP = ["nodes", "output_fields"]
1406 def CheckPrereq(self):
1407 """Check prerequisites.
1409 This checks that the fields required are valid output fields.
1412 self.nodes = _GetWantedNodes(self, self.op.nodes)
1414 _CheckOutputFields(static=["node"],
1415 dynamic=["phys", "vg", "name", "size", "instance"],
1416 selected=self.op.output_fields)
1419 def Exec(self, feedback_fn):
1420 """Computes the list of nodes and their attributes.
1423 nodenames = self.nodes
1424 volumes = rpc.call_node_volumes(nodenames)
1426 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1427 in self.cfg.GetInstanceList()]
1429 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1432 for node in nodenames:
1433 if node not in volumes or not volumes[node]:
1436 node_vols = volumes[node][:]
1437 node_vols.sort(key=lambda vol: vol['dev'])
1439 for vol in node_vols:
1441 for field in self.op.output_fields:
1444 elif field == "phys":
1448 elif field == "name":
1450 elif field == "size":
1451 val = int(float(vol['size']))
1452 elif field == "instance":
1454 if node not in lv_by_node[inst]:
1456 if vol['name'] in lv_by_node[inst][node]:
1462 raise errors.ParameterError(field)
1463 node_output.append(str(val))
1465 output.append(node_output)
1470 class LUAddNode(LogicalUnit):
1471 """Logical unit for adding node to the cluster.
1475 HTYPE = constants.HTYPE_NODE
1476 _OP_REQP = ["node_name"]
1478 def BuildHooksEnv(self):
1481 This will run on all nodes before, and on all nodes + the new node after.
1485 "OP_TARGET": self.op.node_name,
1486 "NODE_NAME": self.op.node_name,
1487 "NODE_PIP": self.op.primary_ip,
1488 "NODE_SIP": self.op.secondary_ip,
1490 nodes_0 = self.cfg.GetNodeList()
1491 nodes_1 = nodes_0 + [self.op.node_name, ]
1492 return env, nodes_0, nodes_1
1494 def CheckPrereq(self):
1495 """Check prerequisites.
1498 - the new node is not already in the config
1500 - its parameters (single/dual homed) matches the cluster
1502 Any errors are signalled by raising errors.OpPrereqError.
1505 node_name = self.op.node_name
1508 dns_data = utils.HostInfo(node_name)
1510 node = dns_data.name
1511 primary_ip = self.op.primary_ip = dns_data.ip
1512 secondary_ip = getattr(self.op, "secondary_ip", None)
1513 if secondary_ip is None:
1514 secondary_ip = primary_ip
1515 if not utils.IsValidIP(secondary_ip):
1516 raise errors.OpPrereqError("Invalid secondary IP given")
1517 self.op.secondary_ip = secondary_ip
1518 node_list = cfg.GetNodeList()
1519 if node in node_list:
1520 raise errors.OpPrereqError("Node %s is already in the configuration"
1523 for existing_node_name in node_list:
1524 existing_node = cfg.GetNodeInfo(existing_node_name)
1525 if (existing_node.primary_ip == primary_ip or
1526 existing_node.secondary_ip == primary_ip or
1527 existing_node.primary_ip == secondary_ip or
1528 existing_node.secondary_ip == secondary_ip):
1529 raise errors.OpPrereqError("New node ip address(es) conflict with"
1530 " existing node %s" % existing_node.name)
1532 # check that the type of the node (single versus dual homed) is the
1533 # same as for the master
1534 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1535 master_singlehomed = myself.secondary_ip == myself.primary_ip
1536 newbie_singlehomed = secondary_ip == primary_ip
1537 if master_singlehomed != newbie_singlehomed:
1538 if master_singlehomed:
1539 raise errors.OpPrereqError("The master has no private ip but the"
1540 " new node has one")
1542 raise errors.OpPrereqError("The master has a private ip but the"
1543 " new node doesn't have one")
1545 # checks reachablity
1546 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1547 raise errors.OpPrereqError("Node not reachable by ping")
1549 if not newbie_singlehomed:
1550 # check reachability from my secondary ip to newbie's secondary ip
1551 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1552 source=myself.secondary_ip):
1553 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1554 " based ping to noded port")
1556 self.new_node = objects.Node(name=node,
1557 primary_ip=primary_ip,
1558 secondary_ip=secondary_ip)
1560 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1561 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1562 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1563 constants.VNC_PASSWORD_FILE)
1565 def Exec(self, feedback_fn):
1566 """Adds the new node to the cluster.
1569 new_node = self.new_node
1570 node = new_node.name
1572 # set up inter-node password and certificate and restarts the node daemon
1573 gntpass = self.sstore.GetNodeDaemonPassword()
1574 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1575 raise errors.OpExecError("ganeti password corruption detected")
1576 f = open(constants.SSL_CERT_FILE)
1578 gntpem = f.read(8192)
1581 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1582 # so we use this to detect an invalid certificate; as long as the
1583 # cert doesn't contain this, the here-document will be correctly
1584 # parsed by the shell sequence below
1585 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1586 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1587 if not gntpem.endswith("\n"):
1588 raise errors.OpExecError("PEM must end with newline")
1589 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1591 # and then connect with ssh to set password and start ganeti-noded
1592 # note that all the below variables are sanitized at this point,
1593 # either by being constants or by the checks above
1595 mycommand = ("umask 077 && "
1596 "echo '%s' > '%s' && "
1597 "cat > '%s' << '!EOF.' && \n"
1598 "%s!EOF.\n%s restart" %
1599 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1600 constants.SSL_CERT_FILE, gntpem,
1601 constants.NODE_INITD_SCRIPT))
1603 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1605 raise errors.OpExecError("Remote command on node %s, error: %s,"
1607 (node, result.fail_reason, result.output))
1609 # check connectivity
1612 result = rpc.call_version([node])[node]
1614 if constants.PROTOCOL_VERSION == result:
1615 logger.Info("communication to node %s fine, sw version %s match" %
1618 raise errors.OpExecError("Version mismatch master version %s,"
1619 " node version %s" %
1620 (constants.PROTOCOL_VERSION, result))
1622 raise errors.OpExecError("Cannot get version from the new node")
1625 logger.Info("copy ssh key to node %s" % node)
1626 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1628 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1629 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1635 keyarray.append(f.read())
1639 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1640 keyarray[3], keyarray[4], keyarray[5])
1643 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1645 # Add node to our /etc/hosts, and add key to known_hosts
1646 _AddHostToEtcHosts(new_node.name)
1648 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1649 self.cfg.GetHostKey())
1651 if new_node.secondary_ip != new_node.primary_ip:
1652 if not rpc.call_node_tcp_ping(new_node.name,
1653 constants.LOCALHOST_IP_ADDRESS,
1654 new_node.secondary_ip,
1655 constants.DEFAULT_NODED_PORT,
1657 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1658 " you gave (%s). Please fix and re-run this"
1659 " command." % new_node.secondary_ip)
1661 success, msg = ssh.VerifyNodeHostname(node)
1663 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1664 " than the one the resolver gives: %s."
1665 " Please fix and re-run this command." %
1668 # Distribute updated /etc/hosts and known_hosts to all nodes,
1669 # including the node just added
1670 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1671 dist_nodes = self.cfg.GetNodeList() + [node]
1672 if myself.name in dist_nodes:
1673 dist_nodes.remove(myself.name)
1675 logger.Debug("Copying hosts and known_hosts to all nodes")
1676 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1677 result = rpc.call_upload_file(dist_nodes, fname)
1678 for to_node in dist_nodes:
1679 if not result[to_node]:
1680 logger.Error("copy of file %s to node %s failed" %
1683 to_copy = ss.GetFileList()
1684 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1685 to_copy.append(constants.VNC_PASSWORD_FILE)
1686 for fname in to_copy:
1687 if not ssh.CopyFileToNode(node, fname):
1688 logger.Error("could not copy file %s to node %s" % (fname, node))
1690 logger.Info("adding node %s to cluster.conf" % node)
1691 self.cfg.AddNode(new_node)
1694 class LUMasterFailover(LogicalUnit):
1695 """Failover the master node to the current node.
1697 This is a special LU in that it must run on a non-master node.
1700 HPATH = "master-failover"
1701 HTYPE = constants.HTYPE_CLUSTER
1705 def BuildHooksEnv(self):
1708 This will run on the new master only in the pre phase, and on all
1709 the nodes in the post phase.
1713 "OP_TARGET": self.new_master,
1714 "NEW_MASTER": self.new_master,
1715 "OLD_MASTER": self.old_master,
1717 return env, [self.new_master], self.cfg.GetNodeList()
1719 def CheckPrereq(self):
1720 """Check prerequisites.
1722 This checks that we are not already the master.
1725 self.new_master = utils.HostInfo().name
1726 self.old_master = self.sstore.GetMasterNode()
1728 if self.old_master == self.new_master:
1729 raise errors.OpPrereqError("This commands must be run on the node"
1730 " where you want the new master to be."
1731 " %s is already the master" %
1734 def Exec(self, feedback_fn):
1735 """Failover the master node.
1737 This command, when run on a non-master node, will cause the current
1738 master to cease being master, and the non-master to become new
1742 #TODO: do not rely on gethostname returning the FQDN
1743 logger.Info("setting master to %s, old master: %s" %
1744 (self.new_master, self.old_master))
1746 if not rpc.call_node_stop_master(self.old_master):
1747 logger.Error("could disable the master role on the old master"
1748 " %s, please disable manually" % self.old_master)
1751 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1752 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1753 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1754 logger.Error("could not distribute the new simple store master file"
1755 " to the other nodes, please check.")
1757 if not rpc.call_node_start_master(self.new_master):
1758 logger.Error("could not start the master role on the new master"
1759 " %s, please check" % self.new_master)
1760 feedback_fn("Error in activating the master IP on the new master,"
1761 " please fix manually.")
1765 class LUQueryClusterInfo(NoHooksLU):
1766 """Query cluster configuration.
1772 def CheckPrereq(self):
1773 """No prerequsites needed for this LU.
1778 def Exec(self, feedback_fn):
1779 """Return cluster config.
1783 "name": self.sstore.GetClusterName(),
1784 "software_version": constants.RELEASE_VERSION,
1785 "protocol_version": constants.PROTOCOL_VERSION,
1786 "config_version": constants.CONFIG_VERSION,
1787 "os_api_version": constants.OS_API_VERSION,
1788 "export_version": constants.EXPORT_VERSION,
1789 "master": self.sstore.GetMasterNode(),
1790 "architecture": (platform.architecture()[0], platform.machine()),
1796 class LUClusterCopyFile(NoHooksLU):
1797 """Copy file to cluster.
1800 _OP_REQP = ["nodes", "filename"]
1802 def CheckPrereq(self):
1803 """Check prerequisites.
1805 It should check that the named file exists and that the given list
1809 if not os.path.exists(self.op.filename):
1810 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1812 self.nodes = _GetWantedNodes(self, self.op.nodes)
1814 def Exec(self, feedback_fn):
1815 """Copy a file from master to some nodes.
1818 opts - class with options as members
1819 args - list containing a single element, the file name
1821 nodes - list containing the name of target nodes; if empty, all nodes
1824 filename = self.op.filename
1826 myname = utils.HostInfo().name
1828 for node in self.nodes:
1831 if not ssh.CopyFileToNode(node, filename):
1832 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1835 class LUDumpClusterConfig(NoHooksLU):
1836 """Return a text-representation of the cluster-config.
1841 def CheckPrereq(self):
1842 """No prerequisites.
1847 def Exec(self, feedback_fn):
1848 """Dump a representation of the cluster config to the standard output.
1851 return self.cfg.DumpConfig()
1854 class LURunClusterCommand(NoHooksLU):
1855 """Run a command on some nodes.
1858 _OP_REQP = ["command", "nodes"]
1860 def CheckPrereq(self):
1861 """Check prerequisites.
1863 It checks that the given list of nodes is valid.
1866 self.nodes = _GetWantedNodes(self, self.op.nodes)
1868 def Exec(self, feedback_fn):
1869 """Run a command on some nodes.
1872 # put the master at the end of the nodes list
1873 master_node = self.sstore.GetMasterNode()
1874 if master_node in self.nodes:
1875 self.nodes.remove(master_node)
1876 self.nodes.append(master_node)
1879 for node in self.nodes:
1880 result = ssh.SSHCall(node, "root", self.op.command)
1881 data.append((node, result.output, result.exit_code))
1886 class LUActivateInstanceDisks(NoHooksLU):
1887 """Bring up an instance's disks.
1890 _OP_REQP = ["instance_name"]
1892 def CheckPrereq(self):
1893 """Check prerequisites.
1895 This checks that the instance is in the cluster.
1898 instance = self.cfg.GetInstanceInfo(
1899 self.cfg.ExpandInstanceName(self.op.instance_name))
1900 if instance is None:
1901 raise errors.OpPrereqError("Instance '%s' not known" %
1902 self.op.instance_name)
1903 self.instance = instance
1906 def Exec(self, feedback_fn):
1907 """Activate the disks.
1910 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1912 raise errors.OpExecError("Cannot activate block devices")
1917 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1918 """Prepare the block devices for an instance.
1920 This sets up the block devices on all nodes.
1923 instance: a ganeti.objects.Instance object
1924 ignore_secondaries: if true, errors on secondary nodes won't result
1925 in an error return from the function
1928 false if the operation failed
1929 list of (host, instance_visible_name, node_visible_name) if the operation
1930 suceeded with the mapping from node devices to instance devices
1934 iname = instance.name
1935 # With the two passes mechanism we try to reduce the window of
1936 # opportunity for the race condition of switching DRBD to primary
1937 # before handshaking occured, but we do not eliminate it
1939 # The proper fix would be to wait (with some limits) until the
1940 # connection has been made and drbd transitions from WFConnection
1941 # into any other network-connected state (Connected, SyncTarget,
1944 # 1st pass, assemble on all nodes in secondary mode
1945 for inst_disk in instance.disks:
1946 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1947 cfg.SetDiskID(node_disk, node)
1948 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1950 logger.Error("could not prepare block device %s on node %s"
1951 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1952 if not ignore_secondaries:
1955 # FIXME: race condition on drbd migration to primary
1957 # 2nd pass, do only the primary node
1958 for inst_disk in instance.disks:
1959 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1960 if node != instance.primary_node:
1962 cfg.SetDiskID(node_disk, node)
1963 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1965 logger.Error("could not prepare block device %s on node %s"
1966 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1968 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1970 # leave the disks configured for the primary node
1971 # this is a workaround that would be fixed better by
1972 # improving the logical/physical id handling
1973 for disk in instance.disks:
1974 cfg.SetDiskID(disk, instance.primary_node)
1976 return disks_ok, device_info
1979 def _StartInstanceDisks(cfg, instance, force):
1980 """Start the disks of an instance.
1983 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1984 ignore_secondaries=force)
1986 _ShutdownInstanceDisks(instance, cfg)
1987 if force is not None and not force:
1988 logger.Error("If the message above refers to a secondary node,"
1989 " you can retry the operation using '--force'.")
1990 raise errors.OpExecError("Disk consistency error")
1993 class LUDeactivateInstanceDisks(NoHooksLU):
1994 """Shutdown an instance's disks.
1997 _OP_REQP = ["instance_name"]
1999 def CheckPrereq(self):
2000 """Check prerequisites.
2002 This checks that the instance is in the cluster.
2005 instance = self.cfg.GetInstanceInfo(
2006 self.cfg.ExpandInstanceName(self.op.instance_name))
2007 if instance is None:
2008 raise errors.OpPrereqError("Instance '%s' not known" %
2009 self.op.instance_name)
2010 self.instance = instance
2012 def Exec(self, feedback_fn):
2013 """Deactivate the disks
2016 instance = self.instance
2017 ins_l = rpc.call_instance_list([instance.primary_node])
2018 ins_l = ins_l[instance.primary_node]
2019 if not type(ins_l) is list:
2020 raise errors.OpExecError("Can't contact node '%s'" %
2021 instance.primary_node)
2023 if self.instance.name in ins_l:
2024 raise errors.OpExecError("Instance is running, can't shutdown"
2027 _ShutdownInstanceDisks(instance, self.cfg)
2030 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2031 """Shutdown block devices of an instance.
2033 This does the shutdown on all nodes of the instance.
2035 If the ignore_primary is false, errors on the primary node are
2040 for disk in instance.disks:
2041 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2042 cfg.SetDiskID(top_disk, node)
2043 if not rpc.call_blockdev_shutdown(node, top_disk):
2044 logger.Error("could not shutdown block device %s on node %s" %
2045 (disk.iv_name, node))
2046 if not ignore_primary or node != instance.primary_node:
2051 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2052 """Checks if a node has enough free memory.
2054 This function check if a given node has the needed amount of free
2055 memory. In case the node has less memory or we cannot get the
2056 information from the node, this function raise an OpPrereqError
2060 - cfg: a ConfigWriter instance
2061 - node: the node name
2062 - reason: string to use in the error message
2063 - requested: the amount of memory in MiB
2066 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2067 if not nodeinfo or not isinstance(nodeinfo, dict):
2068 raise errors.OpPrereqError("Could not contact node %s for resource"
2069 " information" % (node,))
2071 free_mem = nodeinfo[node].get('memory_free')
2072 if not isinstance(free_mem, int):
2073 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2074 " was '%s'" % (node, free_mem))
2075 if requested > free_mem:
2076 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2077 " needed %s MiB, available %s MiB" %
2078 (node, reason, requested, free_mem))
2081 class LUStartupInstance(LogicalUnit):
2082 """Starts an instance.
2085 HPATH = "instance-start"
2086 HTYPE = constants.HTYPE_INSTANCE
2087 _OP_REQP = ["instance_name", "force"]
2089 def BuildHooksEnv(self):
2092 This runs on master, primary and secondary nodes of the instance.
2096 "FORCE": self.op.force,
2098 env.update(_BuildInstanceHookEnvByObject(self.instance))
2099 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2100 list(self.instance.secondary_nodes))
2103 def CheckPrereq(self):
2104 """Check prerequisites.
2106 This checks that the instance is in the cluster.
2109 instance = self.cfg.GetInstanceInfo(
2110 self.cfg.ExpandInstanceName(self.op.instance_name))
2111 if instance is None:
2112 raise errors.OpPrereqError("Instance '%s' not known" %
2113 self.op.instance_name)
2115 # check bridges existance
2116 _CheckInstanceBridgesExist(instance)
2118 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2119 "starting instance %s" % instance.name,
2122 self.instance = instance
2123 self.op.instance_name = instance.name
2125 def Exec(self, feedback_fn):
2126 """Start the instance.
2129 instance = self.instance
2130 force = self.op.force
2131 extra_args = getattr(self.op, "extra_args", "")
2133 self.cfg.MarkInstanceUp(instance.name)
2135 node_current = instance.primary_node
2137 _StartInstanceDisks(self.cfg, instance, force)
2139 if not rpc.call_instance_start(node_current, instance, extra_args):
2140 _ShutdownInstanceDisks(instance, self.cfg)
2141 raise errors.OpExecError("Could not start instance")
2144 class LURebootInstance(LogicalUnit):
2145 """Reboot an instance.
2148 HPATH = "instance-reboot"
2149 HTYPE = constants.HTYPE_INSTANCE
2150 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2152 def BuildHooksEnv(self):
2155 This runs on master, primary and secondary nodes of the instance.
2159 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2161 env.update(_BuildInstanceHookEnvByObject(self.instance))
2162 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2163 list(self.instance.secondary_nodes))
2166 def CheckPrereq(self):
2167 """Check prerequisites.
2169 This checks that the instance is in the cluster.
2172 instance = self.cfg.GetInstanceInfo(
2173 self.cfg.ExpandInstanceName(self.op.instance_name))
2174 if instance is None:
2175 raise errors.OpPrereqError("Instance '%s' not known" %
2176 self.op.instance_name)
2178 # check bridges existance
2179 _CheckInstanceBridgesExist(instance)
2181 self.instance = instance
2182 self.op.instance_name = instance.name
2184 def Exec(self, feedback_fn):
2185 """Reboot the instance.
2188 instance = self.instance
2189 ignore_secondaries = self.op.ignore_secondaries
2190 reboot_type = self.op.reboot_type
2191 extra_args = getattr(self.op, "extra_args", "")
2193 node_current = instance.primary_node
2195 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2196 constants.INSTANCE_REBOOT_HARD,
2197 constants.INSTANCE_REBOOT_FULL]:
2198 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2199 (constants.INSTANCE_REBOOT_SOFT,
2200 constants.INSTANCE_REBOOT_HARD,
2201 constants.INSTANCE_REBOOT_FULL))
2203 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2204 constants.INSTANCE_REBOOT_HARD]:
2205 if not rpc.call_instance_reboot(node_current, instance,
2206 reboot_type, extra_args):
2207 raise errors.OpExecError("Could not reboot instance")
2209 if not rpc.call_instance_shutdown(node_current, instance):
2210 raise errors.OpExecError("could not shutdown instance for full reboot")
2211 _ShutdownInstanceDisks(instance, self.cfg)
2212 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2213 if not rpc.call_instance_start(node_current, instance, extra_args):
2214 _ShutdownInstanceDisks(instance, self.cfg)
2215 raise errors.OpExecError("Could not start instance for full reboot")
2217 self.cfg.MarkInstanceUp(instance.name)
2220 class LUShutdownInstance(LogicalUnit):
2221 """Shutdown an instance.
2224 HPATH = "instance-stop"
2225 HTYPE = constants.HTYPE_INSTANCE
2226 _OP_REQP = ["instance_name"]
2228 def BuildHooksEnv(self):
2231 This runs on master, primary and secondary nodes of the instance.
2234 env = _BuildInstanceHookEnvByObject(self.instance)
2235 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2236 list(self.instance.secondary_nodes))
2239 def CheckPrereq(self):
2240 """Check prerequisites.
2242 This checks that the instance is in the cluster.
2245 instance = self.cfg.GetInstanceInfo(
2246 self.cfg.ExpandInstanceName(self.op.instance_name))
2247 if instance is None:
2248 raise errors.OpPrereqError("Instance '%s' not known" %
2249 self.op.instance_name)
2250 self.instance = instance
2252 def Exec(self, feedback_fn):
2253 """Shutdown the instance.
2256 instance = self.instance
2257 node_current = instance.primary_node
2258 self.cfg.MarkInstanceDown(instance.name)
2259 if not rpc.call_instance_shutdown(node_current, instance):
2260 logger.Error("could not shutdown instance")
2262 _ShutdownInstanceDisks(instance, self.cfg)
2265 class LUReinstallInstance(LogicalUnit):
2266 """Reinstall an instance.
2269 HPATH = "instance-reinstall"
2270 HTYPE = constants.HTYPE_INSTANCE
2271 _OP_REQP = ["instance_name"]
2273 def BuildHooksEnv(self):
2276 This runs on master, primary and secondary nodes of the instance.
2279 env = _BuildInstanceHookEnvByObject(self.instance)
2280 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2281 list(self.instance.secondary_nodes))
2284 def CheckPrereq(self):
2285 """Check prerequisites.
2287 This checks that the instance is in the cluster and is not running.
2290 instance = self.cfg.GetInstanceInfo(
2291 self.cfg.ExpandInstanceName(self.op.instance_name))
2292 if instance is None:
2293 raise errors.OpPrereqError("Instance '%s' not known" %
2294 self.op.instance_name)
2295 if instance.disk_template == constants.DT_DISKLESS:
2296 raise errors.OpPrereqError("Instance '%s' has no disks" %
2297 self.op.instance_name)
2298 if instance.status != "down":
2299 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2300 self.op.instance_name)
2301 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2303 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2304 (self.op.instance_name,
2305 instance.primary_node))
2307 self.op.os_type = getattr(self.op, "os_type", None)
2308 if self.op.os_type is not None:
2310 pnode = self.cfg.GetNodeInfo(
2311 self.cfg.ExpandNodeName(instance.primary_node))
2313 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2315 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2317 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2318 " primary node" % self.op.os_type)
2320 self.instance = instance
2322 def Exec(self, feedback_fn):
2323 """Reinstall the instance.
2326 inst = self.instance
2328 if self.op.os_type is not None:
2329 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2330 inst.os = self.op.os_type
2331 self.cfg.AddInstance(inst)
2333 _StartInstanceDisks(self.cfg, inst, None)
2335 feedback_fn("Running the instance OS create scripts...")
2336 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2337 raise errors.OpExecError("Could not install OS for instance %s"
2339 (inst.name, inst.primary_node))
2341 _ShutdownInstanceDisks(inst, self.cfg)
2344 class LURenameInstance(LogicalUnit):
2345 """Rename an instance.
2348 HPATH = "instance-rename"
2349 HTYPE = constants.HTYPE_INSTANCE
2350 _OP_REQP = ["instance_name", "new_name"]
2352 def BuildHooksEnv(self):
2355 This runs on master, primary and secondary nodes of the instance.
2358 env = _BuildInstanceHookEnvByObject(self.instance)
2359 env["INSTANCE_NEW_NAME"] = self.op.new_name
2360 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2361 list(self.instance.secondary_nodes))
2364 def CheckPrereq(self):
2365 """Check prerequisites.
2367 This checks that the instance is in the cluster and is not running.
2370 instance = self.cfg.GetInstanceInfo(
2371 self.cfg.ExpandInstanceName(self.op.instance_name))
2372 if instance is None:
2373 raise errors.OpPrereqError("Instance '%s' not known" %
2374 self.op.instance_name)
2375 if instance.status != "down":
2376 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2377 self.op.instance_name)
2378 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2380 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2381 (self.op.instance_name,
2382 instance.primary_node))
2383 self.instance = instance
2385 # new name verification
2386 name_info = utils.HostInfo(self.op.new_name)
2388 self.op.new_name = new_name = name_info.name
2389 instance_list = self.cfg.GetInstanceList()
2390 if new_name in instance_list:
2391 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2394 if not getattr(self.op, "ignore_ip", False):
2395 command = ["fping", "-q", name_info.ip]
2396 result = utils.RunCmd(command)
2397 if not result.failed:
2398 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2399 (name_info.ip, new_name))
2402 def Exec(self, feedback_fn):
2403 """Reinstall the instance.
2406 inst = self.instance
2407 old_name = inst.name
2409 self.cfg.RenameInstance(inst.name, self.op.new_name)
2411 # re-read the instance from the configuration after rename
2412 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2414 _StartInstanceDisks(self.cfg, inst, None)
2416 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2418 msg = ("Could run OS rename script for instance %s on node %s (but the"
2419 " instance has been renamed in Ganeti)" %
2420 (inst.name, inst.primary_node))
2423 _ShutdownInstanceDisks(inst, self.cfg)
2426 class LURemoveInstance(LogicalUnit):
2427 """Remove an instance.
2430 HPATH = "instance-remove"
2431 HTYPE = constants.HTYPE_INSTANCE
2432 _OP_REQP = ["instance_name"]
2434 def BuildHooksEnv(self):
2437 This runs on master, primary and secondary nodes of the instance.
2440 env = _BuildInstanceHookEnvByObject(self.instance)
2441 nl = [self.sstore.GetMasterNode()]
2444 def CheckPrereq(self):
2445 """Check prerequisites.
2447 This checks that the instance is in the cluster.
2450 instance = self.cfg.GetInstanceInfo(
2451 self.cfg.ExpandInstanceName(self.op.instance_name))
2452 if instance is None:
2453 raise errors.OpPrereqError("Instance '%s' not known" %
2454 self.op.instance_name)
2455 self.instance = instance
2457 def Exec(self, feedback_fn):
2458 """Remove the instance.
2461 instance = self.instance
2462 logger.Info("shutting down instance %s on node %s" %
2463 (instance.name, instance.primary_node))
2465 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2466 if self.op.ignore_failures:
2467 feedback_fn("Warning: can't shutdown instance")
2469 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2470 (instance.name, instance.primary_node))
2472 logger.Info("removing block devices for instance %s" % instance.name)
2474 if not _RemoveDisks(instance, self.cfg):
2475 if self.op.ignore_failures:
2476 feedback_fn("Warning: can't remove instance's disks")
2478 raise errors.OpExecError("Can't remove instance's disks")
2480 logger.Info("removing instance %s out of cluster config" % instance.name)
2482 self.cfg.RemoveInstance(instance.name)
2485 class LUQueryInstances(NoHooksLU):
2486 """Logical unit for querying instances.
2489 _OP_REQP = ["output_fields", "names"]
2491 def CheckPrereq(self):
2492 """Check prerequisites.
2494 This checks that the fields required are valid output fields.
2497 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2498 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2499 "admin_state", "admin_ram",
2500 "disk_template", "ip", "mac", "bridge",
2501 "sda_size", "sdb_size", "vcpus"],
2502 dynamic=self.dynamic_fields,
2503 selected=self.op.output_fields)
2505 self.wanted = _GetWantedInstances(self, self.op.names)
2507 def Exec(self, feedback_fn):
2508 """Computes the list of nodes and their attributes.
2511 instance_names = self.wanted
2512 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2515 # begin data gathering
2517 nodes = frozenset([inst.primary_node for inst in instance_list])
2520 if self.dynamic_fields.intersection(self.op.output_fields):
2522 node_data = rpc.call_all_instances_info(nodes)
2524 result = node_data[name]
2526 live_data.update(result)
2527 elif result == False:
2528 bad_nodes.append(name)
2529 # else no instance is alive
2531 live_data = dict([(name, {}) for name in instance_names])
2533 # end data gathering
2536 for instance in instance_list:
2538 for field in self.op.output_fields:
2543 elif field == "pnode":
2544 val = instance.primary_node
2545 elif field == "snodes":
2546 val = list(instance.secondary_nodes)
2547 elif field == "admin_state":
2548 val = (instance.status != "down")
2549 elif field == "oper_state":
2550 if instance.primary_node in bad_nodes:
2553 val = bool(live_data.get(instance.name))
2554 elif field == "status":
2555 if instance.primary_node in bad_nodes:
2556 val = "ERROR_nodedown"
2558 running = bool(live_data.get(instance.name))
2560 if instance.status != "down":
2565 if instance.status != "down":
2569 elif field == "admin_ram":
2570 val = instance.memory
2571 elif field == "oper_ram":
2572 if instance.primary_node in bad_nodes:
2574 elif instance.name in live_data:
2575 val = live_data[instance.name].get("memory", "?")
2578 elif field == "disk_template":
2579 val = instance.disk_template
2581 val = instance.nics[0].ip
2582 elif field == "bridge":
2583 val = instance.nics[0].bridge
2584 elif field == "mac":
2585 val = instance.nics[0].mac
2586 elif field == "sda_size" or field == "sdb_size":
2587 disk = instance.FindDisk(field[:3])
2592 elif field == "vcpus":
2593 val = instance.vcpus
2595 raise errors.ParameterError(field)
2602 class LUFailoverInstance(LogicalUnit):
2603 """Failover an instance.
2606 HPATH = "instance-failover"
2607 HTYPE = constants.HTYPE_INSTANCE
2608 _OP_REQP = ["instance_name", "ignore_consistency"]
2610 def BuildHooksEnv(self):
2613 This runs on master, primary and secondary nodes of the instance.
2617 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2619 env.update(_BuildInstanceHookEnvByObject(self.instance))
2620 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2623 def CheckPrereq(self):
2624 """Check prerequisites.
2626 This checks that the instance is in the cluster.
2629 instance = self.cfg.GetInstanceInfo(
2630 self.cfg.ExpandInstanceName(self.op.instance_name))
2631 if instance is None:
2632 raise errors.OpPrereqError("Instance '%s' not known" %
2633 self.op.instance_name)
2635 if instance.disk_template not in constants.DTS_NET_MIRROR:
2636 raise errors.OpPrereqError("Instance's disk layout is not"
2637 " network mirrored, cannot failover.")
2639 secondary_nodes = instance.secondary_nodes
2640 if not secondary_nodes:
2641 raise errors.ProgrammerError("no secondary node but using "
2642 "DT_REMOTE_RAID1 template")
2644 target_node = secondary_nodes[0]
2645 # check memory requirements on the secondary node
2646 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2647 instance.name, instance.memory)
2649 # check bridge existance
2650 brlist = [nic.bridge for nic in instance.nics]
2651 if not rpc.call_bridges_exist(target_node, brlist):
2652 raise errors.OpPrereqError("One or more target bridges %s does not"
2653 " exist on destination node '%s'" %
2654 (brlist, target_node))
2656 self.instance = instance
2658 def Exec(self, feedback_fn):
2659 """Failover an instance.
2661 The failover is done by shutting it down on its present node and
2662 starting it on the secondary.
2665 instance = self.instance
2667 source_node = instance.primary_node
2668 target_node = instance.secondary_nodes[0]
2670 feedback_fn("* checking disk consistency between source and target")
2671 for dev in instance.disks:
2672 # for remote_raid1, these are md over drbd
2673 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2674 if instance.status == "up" and not self.op.ignore_consistency:
2675 raise errors.OpExecError("Disk %s is degraded on target node,"
2676 " aborting failover." % dev.iv_name)
2678 feedback_fn("* shutting down instance on source node")
2679 logger.Info("Shutting down instance %s on node %s" %
2680 (instance.name, source_node))
2682 if not rpc.call_instance_shutdown(source_node, instance):
2683 if self.op.ignore_consistency:
2684 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2685 " anyway. Please make sure node %s is down" %
2686 (instance.name, source_node, source_node))
2688 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2689 (instance.name, source_node))
2691 feedback_fn("* deactivating the instance's disks on source node")
2692 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2693 raise errors.OpExecError("Can't shut down the instance's disks.")
2695 instance.primary_node = target_node
2696 # distribute new instance config to the other nodes
2697 self.cfg.AddInstance(instance)
2699 # Only start the instance if it's marked as up
2700 if instance.status == "up":
2701 feedback_fn("* activating the instance's disks on target node")
2702 logger.Info("Starting instance %s on node %s" %
2703 (instance.name, target_node))
2705 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2706 ignore_secondaries=True)
2708 _ShutdownInstanceDisks(instance, self.cfg)
2709 raise errors.OpExecError("Can't activate the instance's disks")
2711 feedback_fn("* starting the instance on the target node")
2712 if not rpc.call_instance_start(target_node, instance, None):
2713 _ShutdownInstanceDisks(instance, self.cfg)
2714 raise errors.OpExecError("Could not start instance %s on node %s." %
2715 (instance.name, target_node))
2718 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2719 """Create a tree of block devices on the primary node.
2721 This always creates all devices.
2725 for child in device.children:
2726 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2729 cfg.SetDiskID(device, node)
2730 new_id = rpc.call_blockdev_create(node, device, device.size,
2731 instance.name, True, info)
2734 if device.physical_id is None:
2735 device.physical_id = new_id
2739 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2740 """Create a tree of block devices on a secondary node.
2742 If this device type has to be created on secondaries, create it and
2745 If not, just recurse to children keeping the same 'force' value.
2748 if device.CreateOnSecondary():
2751 for child in device.children:
2752 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2753 child, force, info):
2758 cfg.SetDiskID(device, node)
2759 new_id = rpc.call_blockdev_create(node, device, device.size,
2760 instance.name, False, info)
2763 if device.physical_id is None:
2764 device.physical_id = new_id
2768 def _GenerateUniqueNames(cfg, exts):
2769 """Generate a suitable LV name.
2771 This will generate a logical volume name for the given instance.
2776 new_id = cfg.GenerateUniqueID()
2777 results.append("%s%s" % (new_id, val))
2781 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2782 """Generate a drbd device complete with its children.
2785 port = cfg.AllocatePort()
2786 vgname = cfg.GetVGName()
2787 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2788 logical_id=(vgname, names[0]))
2789 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2790 logical_id=(vgname, names[1]))
2791 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2792 logical_id = (primary, secondary, port),
2793 children = [dev_data, dev_meta])
2797 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2798 """Generate a drbd8 device complete with its children.
2801 port = cfg.AllocatePort()
2802 vgname = cfg.GetVGName()
2803 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2804 logical_id=(vgname, names[0]))
2805 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2806 logical_id=(vgname, names[1]))
2807 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2808 logical_id = (primary, secondary, port),
2809 children = [dev_data, dev_meta],
2813 def _GenerateDiskTemplate(cfg, template_name,
2814 instance_name, primary_node,
2815 secondary_nodes, disk_sz, swap_sz):
2816 """Generate the entire disk layout for a given template type.
2819 #TODO: compute space requirements
2821 vgname = cfg.GetVGName()
2822 if template_name == constants.DT_DISKLESS:
2824 elif template_name == constants.DT_PLAIN:
2825 if len(secondary_nodes) != 0:
2826 raise errors.ProgrammerError("Wrong template configuration")
2828 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2829 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2830 logical_id=(vgname, names[0]),
2832 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2833 logical_id=(vgname, names[1]),
2835 disks = [sda_dev, sdb_dev]
2836 elif template_name == constants.DT_LOCAL_RAID1:
2837 if len(secondary_nodes) != 0:
2838 raise errors.ProgrammerError("Wrong template configuration")
2841 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2842 ".sdb_m1", ".sdb_m2"])
2843 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2844 logical_id=(vgname, names[0]))
2845 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2846 logical_id=(vgname, names[1]))
2847 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2849 children = [sda_dev_m1, sda_dev_m2])
2850 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2851 logical_id=(vgname, names[2]))
2852 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2853 logical_id=(vgname, names[3]))
2854 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2856 children = [sdb_dev_m1, sdb_dev_m2])
2857 disks = [md_sda_dev, md_sdb_dev]
2858 elif template_name == constants.DT_REMOTE_RAID1:
2859 if len(secondary_nodes) != 1:
2860 raise errors.ProgrammerError("Wrong template configuration")
2861 remote_node = secondary_nodes[0]
2862 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2863 ".sdb_data", ".sdb_meta"])
2864 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2865 disk_sz, names[0:2])
2866 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2867 children = [drbd_sda_dev], size=disk_sz)
2868 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2869 swap_sz, names[2:4])
2870 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2871 children = [drbd_sdb_dev], size=swap_sz)
2872 disks = [md_sda_dev, md_sdb_dev]
2873 elif template_name == constants.DT_DRBD8:
2874 if len(secondary_nodes) != 1:
2875 raise errors.ProgrammerError("Wrong template configuration")
2876 remote_node = secondary_nodes[0]
2877 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2878 ".sdb_data", ".sdb_meta"])
2879 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2880 disk_sz, names[0:2], "sda")
2881 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2882 swap_sz, names[2:4], "sdb")
2883 disks = [drbd_sda_dev, drbd_sdb_dev]
2885 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2889 def _GetInstanceInfoText(instance):
2890 """Compute that text that should be added to the disk's metadata.
2893 return "originstname+%s" % instance.name
2896 def _CreateDisks(cfg, instance):
2897 """Create all disks for an instance.
2899 This abstracts away some work from AddInstance.
2902 instance: the instance object
2905 True or False showing the success of the creation process
2908 info = _GetInstanceInfoText(instance)
2910 for device in instance.disks:
2911 logger.Info("creating volume %s for instance %s" %
2912 (device.iv_name, instance.name))
2914 for secondary_node in instance.secondary_nodes:
2915 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2916 device, False, info):
2917 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2918 (device.iv_name, device, secondary_node))
2921 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2922 instance, device, info):
2923 logger.Error("failed to create volume %s on primary!" %
2929 def _RemoveDisks(instance, cfg):
2930 """Remove all disks for an instance.
2932 This abstracts away some work from `AddInstance()` and
2933 `RemoveInstance()`. Note that in case some of the devices couldn't
2934 be removed, the removal will continue with the other ones (compare
2935 with `_CreateDisks()`).
2938 instance: the instance object
2941 True or False showing the success of the removal proces
2944 logger.Info("removing block devices for instance %s" % instance.name)
2947 for device in instance.disks:
2948 for node, disk in device.ComputeNodeTree(instance.primary_node):
2949 cfg.SetDiskID(disk, node)
2950 if not rpc.call_blockdev_remove(node, disk):
2951 logger.Error("could not remove block device %s on node %s,"
2952 " continuing anyway" %
2953 (device.iv_name, node))
2958 class LUCreateInstance(LogicalUnit):
2959 """Create an instance.
2962 HPATH = "instance-add"
2963 HTYPE = constants.HTYPE_INSTANCE
2964 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2965 "disk_template", "swap_size", "mode", "start", "vcpus",
2966 "wait_for_sync", "ip_check", "mac"]
2968 def BuildHooksEnv(self):
2971 This runs on master, primary and secondary nodes of the instance.
2975 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2976 "INSTANCE_DISK_SIZE": self.op.disk_size,
2977 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2978 "INSTANCE_ADD_MODE": self.op.mode,
2980 if self.op.mode == constants.INSTANCE_IMPORT:
2981 env["INSTANCE_SRC_NODE"] = self.op.src_node
2982 env["INSTANCE_SRC_PATH"] = self.op.src_path
2983 env["INSTANCE_SRC_IMAGE"] = self.src_image
2985 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2986 primary_node=self.op.pnode,
2987 secondary_nodes=self.secondaries,
2988 status=self.instance_status,
2989 os_type=self.op.os_type,
2990 memory=self.op.mem_size,
2991 vcpus=self.op.vcpus,
2992 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2995 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3000 def CheckPrereq(self):
3001 """Check prerequisites.
3004 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
3005 if not hasattr(self.op, attr):
3006 setattr(self.op, attr, None)
3008 if self.op.mode not in (constants.INSTANCE_CREATE,
3009 constants.INSTANCE_IMPORT):
3010 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3013 if self.op.mode == constants.INSTANCE_IMPORT:
3014 src_node = getattr(self.op, "src_node", None)
3015 src_path = getattr(self.op, "src_path", None)
3016 if src_node is None or src_path is None:
3017 raise errors.OpPrereqError("Importing an instance requires source"
3018 " node and path options")
3019 src_node_full = self.cfg.ExpandNodeName(src_node)
3020 if src_node_full is None:
3021 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3022 self.op.src_node = src_node = src_node_full
3024 if not os.path.isabs(src_path):
3025 raise errors.OpPrereqError("The source path must be absolute")
3027 export_info = rpc.call_export_info(src_node, src_path)
3030 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3032 if not export_info.has_section(constants.INISECT_EXP):
3033 raise errors.ProgrammerError("Corrupted export config")
3035 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3036 if (int(ei_version) != constants.EXPORT_VERSION):
3037 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3038 (ei_version, constants.EXPORT_VERSION))
3040 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3041 raise errors.OpPrereqError("Can't import instance with more than"
3044 # FIXME: are the old os-es, disk sizes, etc. useful?
3045 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3046 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3048 self.src_image = diskimage
3049 else: # INSTANCE_CREATE
3050 if getattr(self.op, "os_type", None) is None:
3051 raise errors.OpPrereqError("No guest OS specified")
3053 # check primary node
3054 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3056 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3058 self.op.pnode = pnode.name
3060 self.secondaries = []
3061 # disk template and mirror node verification
3062 if self.op.disk_template not in constants.DISK_TEMPLATES:
3063 raise errors.OpPrereqError("Invalid disk template name")
3065 if self.op.disk_template in constants.DTS_NET_MIRROR:
3066 if getattr(self.op, "snode", None) is None:
3067 raise errors.OpPrereqError("The networked disk templates need"
3070 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3071 if snode_name is None:
3072 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3074 elif snode_name == pnode.name:
3075 raise errors.OpPrereqError("The secondary node cannot be"
3076 " the primary node.")
3077 self.secondaries.append(snode_name)
3079 # Required free disk space as a function of disk and swap space
3081 constants.DT_DISKLESS: None,
3082 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3083 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3084 # 256 MB are added for drbd metadata, 128MB for each drbd device
3085 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3086 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3089 if self.op.disk_template not in req_size_dict:
3090 raise errors.ProgrammerError("Disk template '%s' size requirement"
3091 " is unknown" % self.op.disk_template)
3093 req_size = req_size_dict[self.op.disk_template]
3095 # Check lv size requirements
3096 if req_size is not None:
3097 nodenames = [pnode.name] + self.secondaries
3098 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3099 for node in nodenames:
3100 info = nodeinfo.get(node, None)
3102 raise errors.OpPrereqError("Cannot get current information"
3103 " from node '%s'" % nodeinfo)
3104 vg_free = info.get('vg_free', None)
3105 if not isinstance(vg_free, int):
3106 raise errors.OpPrereqError("Can't compute free disk space on"
3108 if req_size > info['vg_free']:
3109 raise errors.OpPrereqError("Not enough disk space on target node %s."
3110 " %d MB available, %d MB required" %
3111 (node, info['vg_free'], req_size))
3114 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3116 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3117 " primary node" % self.op.os_type)
3119 if self.op.kernel_path == constants.VALUE_NONE:
3120 raise errors.OpPrereqError("Can't set instance kernel to none")
3122 # instance verification
3123 hostname1 = utils.HostInfo(self.op.instance_name)
3125 self.op.instance_name = instance_name = hostname1.name
3126 instance_list = self.cfg.GetInstanceList()
3127 if instance_name in instance_list:
3128 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3131 ip = getattr(self.op, "ip", None)
3132 if ip is None or ip.lower() == "none":
3134 elif ip.lower() == "auto":
3135 inst_ip = hostname1.ip
3137 if not utils.IsValidIP(ip):
3138 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3139 " like a valid IP" % ip)
3141 self.inst_ip = inst_ip
3143 if self.op.start and not self.op.ip_check:
3144 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3145 " adding an instance in start mode")
3147 if self.op.ip_check:
3148 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3149 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3150 (hostname1.ip, instance_name))
3152 # MAC address verification
3153 if self.op.mac != "auto":
3154 if not utils.IsValidMac(self.op.mac.lower()):
3155 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3158 # bridge verification
3159 bridge = getattr(self.op, "bridge", None)
3161 self.op.bridge = self.cfg.GetDefBridge()
3163 self.op.bridge = bridge
3165 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3166 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3167 " destination node '%s'" %
3168 (self.op.bridge, pnode.name))
3170 # boot order verification
3171 if self.op.hvm_boot_order is not None:
3172 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3173 raise errors.OpPrereqError("invalid boot order specified,"
3174 " must be one or more of [acdn]")
3177 self.instance_status = 'up'
3179 self.instance_status = 'down'
3181 def Exec(self, feedback_fn):
3182 """Create and add the instance to the cluster.
3185 instance = self.op.instance_name
3186 pnode_name = self.pnode.name
3188 if self.op.mac == "auto":
3189 mac_address = self.cfg.GenerateMAC()
3191 mac_address = self.op.mac
3193 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3194 if self.inst_ip is not None:
3195 nic.ip = self.inst_ip
3197 ht_kind = self.sstore.GetHypervisorType()
3198 if ht_kind in constants.HTS_REQ_PORT:
3199 network_port = self.cfg.AllocatePort()
3203 disks = _GenerateDiskTemplate(self.cfg,
3204 self.op.disk_template,
3205 instance, pnode_name,
3206 self.secondaries, self.op.disk_size,
3209 iobj = objects.Instance(name=instance, os=self.op.os_type,
3210 primary_node=pnode_name,
3211 memory=self.op.mem_size,
3212 vcpus=self.op.vcpus,
3213 nics=[nic], disks=disks,
3214 disk_template=self.op.disk_template,
3215 status=self.instance_status,
3216 network_port=network_port,
3217 kernel_path=self.op.kernel_path,
3218 initrd_path=self.op.initrd_path,
3219 hvm_boot_order=self.op.hvm_boot_order,
3222 feedback_fn("* creating instance disks...")
3223 if not _CreateDisks(self.cfg, iobj):
3224 _RemoveDisks(iobj, self.cfg)
3225 raise errors.OpExecError("Device creation failed, reverting...")
3227 feedback_fn("adding instance %s to cluster config" % instance)
3229 self.cfg.AddInstance(iobj)
3231 if self.op.wait_for_sync:
3232 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3233 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3234 # make sure the disks are not degraded (still sync-ing is ok)
3236 feedback_fn("* checking mirrors status")
3237 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3242 _RemoveDisks(iobj, self.cfg)
3243 self.cfg.RemoveInstance(iobj.name)
3244 raise errors.OpExecError("There are some degraded disks for"
3247 feedback_fn("creating os for instance %s on node %s" %
3248 (instance, pnode_name))
3250 if iobj.disk_template != constants.DT_DISKLESS:
3251 if self.op.mode == constants.INSTANCE_CREATE:
3252 feedback_fn("* running the instance OS create scripts...")
3253 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3254 raise errors.OpExecError("could not add os for instance %s"
3256 (instance, pnode_name))
3258 elif self.op.mode == constants.INSTANCE_IMPORT:
3259 feedback_fn("* running the instance OS import scripts...")
3260 src_node = self.op.src_node
3261 src_image = self.src_image
3262 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3263 src_node, src_image):
3264 raise errors.OpExecError("Could not import os for instance"
3266 (instance, pnode_name))
3268 # also checked in the prereq part
3269 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3273 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3274 feedback_fn("* starting instance...")
3275 if not rpc.call_instance_start(pnode_name, iobj, None):
3276 raise errors.OpExecError("Could not start instance")
3279 class LUConnectConsole(NoHooksLU):
3280 """Connect to an instance's console.
3282 This is somewhat special in that it returns the command line that
3283 you need to run on the master node in order to connect to the
3287 _OP_REQP = ["instance_name"]
3289 def CheckPrereq(self):
3290 """Check prerequisites.
3292 This checks that the instance is in the cluster.
3295 instance = self.cfg.GetInstanceInfo(
3296 self.cfg.ExpandInstanceName(self.op.instance_name))
3297 if instance is None:
3298 raise errors.OpPrereqError("Instance '%s' not known" %
3299 self.op.instance_name)
3300 self.instance = instance
3302 def Exec(self, feedback_fn):
3303 """Connect to the console of an instance
3306 instance = self.instance
3307 node = instance.primary_node
3309 node_insts = rpc.call_instance_list([node])[node]
3310 if node_insts is False:
3311 raise errors.OpExecError("Can't connect to node %s." % node)
3313 if instance.name not in node_insts:
3314 raise errors.OpExecError("Instance %s is not running." % instance.name)
3316 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3318 hyper = hypervisor.GetHypervisor()
3319 console_cmd = hyper.GetShellCommandForConsole(instance)
3321 argv = ["ssh", "-q", "-t"]
3322 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3323 argv.extend(ssh.BATCH_MODE_OPTS)
3325 argv.append(console_cmd)
3329 class LUAddMDDRBDComponent(LogicalUnit):
3330 """Adda new mirror member to an instance's disk.
3333 HPATH = "mirror-add"
3334 HTYPE = constants.HTYPE_INSTANCE
3335 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3337 def BuildHooksEnv(self):
3340 This runs on the master, the primary and all the secondaries.
3344 "NEW_SECONDARY": self.op.remote_node,
3345 "DISK_NAME": self.op.disk_name,
3347 env.update(_BuildInstanceHookEnvByObject(self.instance))
3348 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3349 self.op.remote_node,] + list(self.instance.secondary_nodes)
3352 def CheckPrereq(self):
3353 """Check prerequisites.
3355 This checks that the instance is in the cluster.
3358 instance = self.cfg.GetInstanceInfo(
3359 self.cfg.ExpandInstanceName(self.op.instance_name))
3360 if instance is None:
3361 raise errors.OpPrereqError("Instance '%s' not known" %
3362 self.op.instance_name)
3363 self.instance = instance
3365 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3366 if remote_node is None:
3367 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3368 self.remote_node = remote_node
3370 if remote_node == instance.primary_node:
3371 raise errors.OpPrereqError("The specified node is the primary node of"
3374 if instance.disk_template != constants.DT_REMOTE_RAID1:
3375 raise errors.OpPrereqError("Instance's disk layout is not"
3377 for disk in instance.disks:
3378 if disk.iv_name == self.op.disk_name:
3381 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3382 " instance." % self.op.disk_name)
3383 if len(disk.children) > 1:
3384 raise errors.OpPrereqError("The device already has two slave devices."
3385 " This would create a 3-disk raid1 which we"
3389 def Exec(self, feedback_fn):
3390 """Add the mirror component
3394 instance = self.instance
3396 remote_node = self.remote_node
3397 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3398 names = _GenerateUniqueNames(self.cfg, lv_names)
3399 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3400 remote_node, disk.size, names)
3402 logger.Info("adding new mirror component on secondary")
3404 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3406 _GetInstanceInfoText(instance)):
3407 raise errors.OpExecError("Failed to create new component on secondary"
3408 " node %s" % remote_node)
3410 logger.Info("adding new mirror component on primary")
3412 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3414 _GetInstanceInfoText(instance)):
3415 # remove secondary dev
3416 self.cfg.SetDiskID(new_drbd, remote_node)
3417 rpc.call_blockdev_remove(remote_node, new_drbd)
3418 raise errors.OpExecError("Failed to create volume on primary")
3420 # the device exists now
3421 # call the primary node to add the mirror to md
3422 logger.Info("adding new mirror component to md")
3423 if not rpc.call_blockdev_addchildren(instance.primary_node,
3425 logger.Error("Can't add mirror compoment to md!")
3426 self.cfg.SetDiskID(new_drbd, remote_node)
3427 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3428 logger.Error("Can't rollback on secondary")
3429 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3430 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3431 logger.Error("Can't rollback on primary")
3432 raise errors.OpExecError("Can't add mirror component to md array")
3434 disk.children.append(new_drbd)
3436 self.cfg.AddInstance(instance)
3438 _WaitForSync(self.cfg, instance, self.proc)
3443 class LURemoveMDDRBDComponent(LogicalUnit):
3444 """Remove a component from a remote_raid1 disk.
3447 HPATH = "mirror-remove"
3448 HTYPE = constants.HTYPE_INSTANCE
3449 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3451 def BuildHooksEnv(self):
3454 This runs on the master, the primary and all the secondaries.
3458 "DISK_NAME": self.op.disk_name,
3459 "DISK_ID": self.op.disk_id,
3460 "OLD_SECONDARY": self.old_secondary,
3462 env.update(_BuildInstanceHookEnvByObject(self.instance))
3463 nl = [self.sstore.GetMasterNode(),
3464 self.instance.primary_node] + list(self.instance.secondary_nodes)
3467 def CheckPrereq(self):
3468 """Check prerequisites.
3470 This checks that the instance is in the cluster.
3473 instance = self.cfg.GetInstanceInfo(
3474 self.cfg.ExpandInstanceName(self.op.instance_name))
3475 if instance is None:
3476 raise errors.OpPrereqError("Instance '%s' not known" %
3477 self.op.instance_name)
3478 self.instance = instance
3480 if instance.disk_template != constants.DT_REMOTE_RAID1:
3481 raise errors.OpPrereqError("Instance's disk layout is not"
3483 for disk in instance.disks:
3484 if disk.iv_name == self.op.disk_name:
3487 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3488 " instance." % self.op.disk_name)
3489 for child in disk.children:
3490 if (child.dev_type == constants.LD_DRBD7 and
3491 child.logical_id[2] == self.op.disk_id):
3494 raise errors.OpPrereqError("Can't find the device with this port.")
3496 if len(disk.children) < 2:
3497 raise errors.OpPrereqError("Cannot remove the last component from"
3501 if self.child.logical_id[0] == instance.primary_node:
3505 self.old_secondary = self.child.logical_id[oid]
3507 def Exec(self, feedback_fn):
3508 """Remove the mirror component
3511 instance = self.instance
3514 logger.Info("remove mirror component")
3515 self.cfg.SetDiskID(disk, instance.primary_node)
3516 if not rpc.call_blockdev_removechildren(instance.primary_node,
3518 raise errors.OpExecError("Can't remove child from mirror.")
3520 for node in child.logical_id[:2]:
3521 self.cfg.SetDiskID(child, node)
3522 if not rpc.call_blockdev_remove(node, child):
3523 logger.Error("Warning: failed to remove device from node %s,"
3524 " continuing operation." % node)
3526 disk.children.remove(child)
3527 self.cfg.AddInstance(instance)
3530 class LUReplaceDisks(LogicalUnit):
3531 """Replace the disks of an instance.
3534 HPATH = "mirrors-replace"
3535 HTYPE = constants.HTYPE_INSTANCE
3536 _OP_REQP = ["instance_name", "mode", "disks"]
3538 def BuildHooksEnv(self):
3541 This runs on the master, the primary and all the secondaries.
3545 "MODE": self.op.mode,
3546 "NEW_SECONDARY": self.op.remote_node,
3547 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3549 env.update(_BuildInstanceHookEnvByObject(self.instance))
3551 self.sstore.GetMasterNode(),
3552 self.instance.primary_node,
3554 if self.op.remote_node is not None:
3555 nl.append(self.op.remote_node)
3558 def CheckPrereq(self):
3559 """Check prerequisites.
3561 This checks that the instance is in the cluster.
3564 instance = self.cfg.GetInstanceInfo(
3565 self.cfg.ExpandInstanceName(self.op.instance_name))
3566 if instance is None:
3567 raise errors.OpPrereqError("Instance '%s' not known" %
3568 self.op.instance_name)
3569 self.instance = instance
3570 self.op.instance_name = instance.name
3572 if instance.disk_template not in constants.DTS_NET_MIRROR:
3573 raise errors.OpPrereqError("Instance's disk layout is not"
3574 " network mirrored.")
3576 if len(instance.secondary_nodes) != 1:
3577 raise errors.OpPrereqError("The instance has a strange layout,"
3578 " expected one secondary but found %d" %
3579 len(instance.secondary_nodes))
3581 self.sec_node = instance.secondary_nodes[0]
3583 remote_node = getattr(self.op, "remote_node", None)
3584 if remote_node is not None:
3585 remote_node = self.cfg.ExpandNodeName(remote_node)
3586 if remote_node is None:
3587 raise errors.OpPrereqError("Node '%s' not known" %
3588 self.op.remote_node)
3589 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3591 self.remote_node_info = None
3592 if remote_node == instance.primary_node:
3593 raise errors.OpPrereqError("The specified node is the primary node of"
3595 elif remote_node == self.sec_node:
3596 if self.op.mode == constants.REPLACE_DISK_SEC:
3597 # this is for DRBD8, where we can't execute the same mode of
3598 # replacement as for drbd7 (no different port allocated)
3599 raise errors.OpPrereqError("Same secondary given, cannot execute"
3601 # the user gave the current secondary, switch to
3602 # 'no-replace-secondary' mode for drbd7
3604 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3605 self.op.mode != constants.REPLACE_DISK_ALL):
3606 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3607 " disks replacement, not individual ones")
3608 if instance.disk_template == constants.DT_DRBD8:
3609 if (self.op.mode == constants.REPLACE_DISK_ALL and
3610 remote_node is not None):
3611 # switch to replace secondary mode
3612 self.op.mode = constants.REPLACE_DISK_SEC
3614 if self.op.mode == constants.REPLACE_DISK_ALL:
3615 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3616 " secondary disk replacement, not"
3618 elif self.op.mode == constants.REPLACE_DISK_PRI:
3619 if remote_node is not None:
3620 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3621 " the secondary while doing a primary"
3622 " node disk replacement")
3623 self.tgt_node = instance.primary_node
3624 self.oth_node = instance.secondary_nodes[0]
3625 elif self.op.mode == constants.REPLACE_DISK_SEC:
3626 self.new_node = remote_node # this can be None, in which case
3627 # we don't change the secondary
3628 self.tgt_node = instance.secondary_nodes[0]
3629 self.oth_node = instance.primary_node
3631 raise errors.ProgrammerError("Unhandled disk replace mode")
3633 for name in self.op.disks:
3634 if instance.FindDisk(name) is None:
3635 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3636 (name, instance.name))
3637 self.op.remote_node = remote_node
3639 def _ExecRR1(self, feedback_fn):
3640 """Replace the disks of an instance.
3643 instance = self.instance
3646 if self.op.remote_node is None:
3647 remote_node = self.sec_node
3649 remote_node = self.op.remote_node
3651 for dev in instance.disks:
3653 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3654 names = _GenerateUniqueNames(cfg, lv_names)
3655 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3656 remote_node, size, names)
3657 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3658 logger.Info("adding new mirror component on secondary for %s" %
3661 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3663 _GetInstanceInfoText(instance)):
3664 raise errors.OpExecError("Failed to create new component on secondary"
3665 " node %s. Full abort, cleanup manually!" %
3668 logger.Info("adding new mirror component on primary")
3670 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3672 _GetInstanceInfoText(instance)):
3673 # remove secondary dev
3674 cfg.SetDiskID(new_drbd, remote_node)
3675 rpc.call_blockdev_remove(remote_node, new_drbd)
3676 raise errors.OpExecError("Failed to create volume on primary!"
3677 " Full abort, cleanup manually!!")
3679 # the device exists now
3680 # call the primary node to add the mirror to md
3681 logger.Info("adding new mirror component to md")
3682 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3684 logger.Error("Can't add mirror compoment to md!")
3685 cfg.SetDiskID(new_drbd, remote_node)
3686 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3687 logger.Error("Can't rollback on secondary")
3688 cfg.SetDiskID(new_drbd, instance.primary_node)
3689 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3690 logger.Error("Can't rollback on primary")
3691 raise errors.OpExecError("Full abort, cleanup manually!!")
3693 dev.children.append(new_drbd)
3694 cfg.AddInstance(instance)
3696 # this can fail as the old devices are degraded and _WaitForSync
3697 # does a combined result over all disks, so we don't check its
3699 _WaitForSync(cfg, instance, self.proc, unlock=True)
3701 # so check manually all the devices
3702 for name in iv_names:
3703 dev, child, new_drbd = iv_names[name]
3704 cfg.SetDiskID(dev, instance.primary_node)
3705 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3707 raise errors.OpExecError("MD device %s is degraded!" % name)
3708 cfg.SetDiskID(new_drbd, instance.primary_node)
3709 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3711 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3713 for name in iv_names:
3714 dev, child, new_drbd = iv_names[name]
3715 logger.Info("remove mirror %s component" % name)
3716 cfg.SetDiskID(dev, instance.primary_node)
3717 if not rpc.call_blockdev_removechildren(instance.primary_node,
3719 logger.Error("Can't remove child from mirror, aborting"
3720 " *this device cleanup*.\nYou need to cleanup manually!!")
3723 for node in child.logical_id[:2]:
3724 logger.Info("remove child device on %s" % node)
3725 cfg.SetDiskID(child, node)
3726 if not rpc.call_blockdev_remove(node, child):
3727 logger.Error("Warning: failed to remove device from node %s,"
3728 " continuing operation." % node)
3730 dev.children.remove(child)
3732 cfg.AddInstance(instance)
3734 def _ExecD8DiskOnly(self, feedback_fn):
3735 """Replace a disk on the primary or secondary for dbrd8.
3737 The algorithm for replace is quite complicated:
3738 - for each disk to be replaced:
3739 - create new LVs on the target node with unique names
3740 - detach old LVs from the drbd device
3741 - rename old LVs to name_replaced.<time_t>
3742 - rename new LVs to old LVs
3743 - attach the new LVs (with the old names now) to the drbd device
3744 - wait for sync across all devices
3745 - for each modified disk:
3746 - remove old LVs (which have the name name_replaces.<time_t>)
3748 Failures are not very well handled.
3752 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3753 instance = self.instance
3755 vgname = self.cfg.GetVGName()
3758 tgt_node = self.tgt_node
3759 oth_node = self.oth_node
3761 # Step: check device activation
3762 self.proc.LogStep(1, steps_total, "check device existence")
3763 info("checking volume groups")
3764 my_vg = cfg.GetVGName()
3765 results = rpc.call_vg_list([oth_node, tgt_node])
3767 raise errors.OpExecError("Can't list volume groups on the nodes")
3768 for node in oth_node, tgt_node:
3769 res = results.get(node, False)
3770 if not res or my_vg not in res:
3771 raise errors.OpExecError("Volume group '%s' not found on %s" %
3773 for dev in instance.disks:
3774 if not dev.iv_name in self.op.disks:
3776 for node in tgt_node, oth_node:
3777 info("checking %s on %s" % (dev.iv_name, node))
3778 cfg.SetDiskID(dev, node)
3779 if not rpc.call_blockdev_find(node, dev):
3780 raise errors.OpExecError("Can't find device %s on node %s" %
3781 (dev.iv_name, node))
3783 # Step: check other node consistency
3784 self.proc.LogStep(2, steps_total, "check peer consistency")
3785 for dev in instance.disks:
3786 if not dev.iv_name in self.op.disks:
3788 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3789 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3790 oth_node==instance.primary_node):
3791 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3792 " to replace disks on this node (%s)" %
3793 (oth_node, tgt_node))
3795 # Step: create new storage
3796 self.proc.LogStep(3, steps_total, "allocate new storage")
3797 for dev in instance.disks:
3798 if not dev.iv_name in self.op.disks:
3801 cfg.SetDiskID(dev, tgt_node)
3802 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3803 names = _GenerateUniqueNames(cfg, lv_names)
3804 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3805 logical_id=(vgname, names[0]))
3806 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3807 logical_id=(vgname, names[1]))
3808 new_lvs = [lv_data, lv_meta]
3809 old_lvs = dev.children
3810 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3811 info("creating new local storage on %s for %s" %
3812 (tgt_node, dev.iv_name))
3813 # since we *always* want to create this LV, we use the
3814 # _Create...OnPrimary (which forces the creation), even if we
3815 # are talking about the secondary node
3816 for new_lv in new_lvs:
3817 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3818 _GetInstanceInfoText(instance)):
3819 raise errors.OpExecError("Failed to create new LV named '%s' on"
3821 (new_lv.logical_id[1], tgt_node))
3823 # Step: for each lv, detach+rename*2+attach
3824 self.proc.LogStep(4, steps_total, "change drbd configuration")
3825 for dev, old_lvs, new_lvs in iv_names.itervalues():
3826 info("detaching %s drbd from local storage" % dev.iv_name)
3827 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3828 raise errors.OpExecError("Can't detach drbd from local storage on node"
3829 " %s for device %s" % (tgt_node, dev.iv_name))
3831 #cfg.Update(instance)
3833 # ok, we created the new LVs, so now we know we have the needed
3834 # storage; as such, we proceed on the target node to rename
3835 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3836 # using the assumption that logical_id == physical_id (which in
3837 # turn is the unique_id on that node)
3839 # FIXME(iustin): use a better name for the replaced LVs
3840 temp_suffix = int(time.time())
3841 ren_fn = lambda d, suff: (d.physical_id[0],
3842 d.physical_id[1] + "_replaced-%s" % suff)
3843 # build the rename list based on what LVs exist on the node
3845 for to_ren in old_lvs:
3846 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3847 if find_res is not None: # device exists
3848 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3850 info("renaming the old LVs on the target node")
3851 if not rpc.call_blockdev_rename(tgt_node, rlist):
3852 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3853 # now we rename the new LVs to the old LVs
3854 info("renaming the new LVs on the target node")
3855 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3856 if not rpc.call_blockdev_rename(tgt_node, rlist):
3857 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3859 for old, new in zip(old_lvs, new_lvs):
3860 new.logical_id = old.logical_id
3861 cfg.SetDiskID(new, tgt_node)
3863 for disk in old_lvs:
3864 disk.logical_id = ren_fn(disk, temp_suffix)
3865 cfg.SetDiskID(disk, tgt_node)
3867 # now that the new lvs have the old name, we can add them to the device
3868 info("adding new mirror component on %s" % tgt_node)
3869 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3870 for new_lv in new_lvs:
3871 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3872 warning("Can't rollback device %s", hint="manually cleanup unused"
3874 raise errors.OpExecError("Can't add local storage to drbd")
3876 dev.children = new_lvs
3877 cfg.Update(instance)
3879 # Step: wait for sync
3881 # this can fail as the old devices are degraded and _WaitForSync
3882 # does a combined result over all disks, so we don't check its
3884 self.proc.LogStep(5, steps_total, "sync devices")
3885 _WaitForSync(cfg, instance, self.proc, unlock=True)
3887 # so check manually all the devices
3888 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3889 cfg.SetDiskID(dev, instance.primary_node)
3890 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3892 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3894 # Step: remove old storage
3895 self.proc.LogStep(6, steps_total, "removing old storage")
3896 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3897 info("remove logical volumes for %s" % name)
3899 cfg.SetDiskID(lv, tgt_node)
3900 if not rpc.call_blockdev_remove(tgt_node, lv):
3901 warning("Can't remove old LV", hint="manually remove unused LVs")
3904 def _ExecD8Secondary(self, feedback_fn):
3905 """Replace the secondary node for drbd8.
3907 The algorithm for replace is quite complicated:
3908 - for all disks of the instance:
3909 - create new LVs on the new node with same names
3910 - shutdown the drbd device on the old secondary
3911 - disconnect the drbd network on the primary
3912 - create the drbd device on the new secondary
3913 - network attach the drbd on the primary, using an artifice:
3914 the drbd code for Attach() will connect to the network if it
3915 finds a device which is connected to the good local disks but
3917 - wait for sync across all devices
3918 - remove all disks from the old secondary
3920 Failures are not very well handled.
3924 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3925 instance = self.instance
3927 vgname = self.cfg.GetVGName()
3930 old_node = self.tgt_node
3931 new_node = self.new_node
3932 pri_node = instance.primary_node
3934 # Step: check device activation
3935 self.proc.LogStep(1, steps_total, "check device existence")
3936 info("checking volume groups")
3937 my_vg = cfg.GetVGName()
3938 results = rpc.call_vg_list([pri_node, new_node])
3940 raise errors.OpExecError("Can't list volume groups on the nodes")
3941 for node in pri_node, new_node:
3942 res = results.get(node, False)
3943 if not res or my_vg not in res:
3944 raise errors.OpExecError("Volume group '%s' not found on %s" %
3946 for dev in instance.disks:
3947 if not dev.iv_name in self.op.disks:
3949 info("checking %s on %s" % (dev.iv_name, pri_node))
3950 cfg.SetDiskID(dev, pri_node)
3951 if not rpc.call_blockdev_find(pri_node, dev):
3952 raise errors.OpExecError("Can't find device %s on node %s" %
3953 (dev.iv_name, pri_node))
3955 # Step: check other node consistency
3956 self.proc.LogStep(2, steps_total, "check peer consistency")
3957 for dev in instance.disks:
3958 if not dev.iv_name in self.op.disks:
3960 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3961 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3962 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3963 " unsafe to replace the secondary" %
3966 # Step: create new storage
3967 self.proc.LogStep(3, steps_total, "allocate new storage")
3968 for dev in instance.disks:
3970 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3971 # since we *always* want to create this LV, we use the
3972 # _Create...OnPrimary (which forces the creation), even if we
3973 # are talking about the secondary node
3974 for new_lv in dev.children:
3975 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3976 _GetInstanceInfoText(instance)):
3977 raise errors.OpExecError("Failed to create new LV named '%s' on"
3979 (new_lv.logical_id[1], new_node))
3981 iv_names[dev.iv_name] = (dev, dev.children)
3983 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3984 for dev in instance.disks:
3986 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3987 # create new devices on new_node
3988 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3989 logical_id=(pri_node, new_node,
3991 children=dev.children)
3992 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3994 _GetInstanceInfoText(instance)):
3995 raise errors.OpExecError("Failed to create new DRBD on"
3996 " node '%s'" % new_node)
3998 for dev in instance.disks:
3999 # we have new devices, shutdown the drbd on the old secondary
4000 info("shutting down drbd for %s on old node" % dev.iv_name)
4001 cfg.SetDiskID(dev, old_node)
4002 if not rpc.call_blockdev_shutdown(old_node, dev):
4003 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4004 hint="Please cleanup this device manually as soon as possible")
4006 info("detaching primary drbds from the network (=> standalone)")
4008 for dev in instance.disks:
4009 cfg.SetDiskID(dev, pri_node)
4010 # set the physical (unique in bdev terms) id to None, meaning
4011 # detach from network
4012 dev.physical_id = (None,) * len(dev.physical_id)
4013 # and 'find' the device, which will 'fix' it to match the
4015 if rpc.call_blockdev_find(pri_node, dev):
4018 warning("Failed to detach drbd %s from network, unusual case" %
4022 # no detaches succeeded (very unlikely)
4023 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4025 # if we managed to detach at least one, we update all the disks of
4026 # the instance to point to the new secondary
4027 info("updating instance configuration")
4028 for dev in instance.disks:
4029 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4030 cfg.SetDiskID(dev, pri_node)
4031 cfg.Update(instance)
4033 # and now perform the drbd attach
4034 info("attaching primary drbds to new secondary (standalone => connected)")
4036 for dev in instance.disks:
4037 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4038 # since the attach is smart, it's enough to 'find' the device,
4039 # it will automatically activate the network, if the physical_id
4041 cfg.SetDiskID(dev, pri_node)
4042 if not rpc.call_blockdev_find(pri_node, dev):
4043 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4044 "please do a gnt-instance info to see the status of disks")
4046 # this can fail as the old devices are degraded and _WaitForSync
4047 # does a combined result over all disks, so we don't check its
4049 self.proc.LogStep(5, steps_total, "sync devices")
4050 _WaitForSync(cfg, instance, self.proc, unlock=True)
4052 # so check manually all the devices
4053 for name, (dev, old_lvs) in iv_names.iteritems():
4054 cfg.SetDiskID(dev, pri_node)
4055 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4057 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4059 self.proc.LogStep(6, steps_total, "removing old storage")
4060 for name, (dev, old_lvs) in iv_names.iteritems():
4061 info("remove logical volumes for %s" % name)
4063 cfg.SetDiskID(lv, old_node)
4064 if not rpc.call_blockdev_remove(old_node, lv):
4065 warning("Can't remove LV on old secondary",
4066 hint="Cleanup stale volumes by hand")
4068 def Exec(self, feedback_fn):
4069 """Execute disk replacement.
4071 This dispatches the disk replacement to the appropriate handler.
4074 instance = self.instance
4075 if instance.disk_template == constants.DT_REMOTE_RAID1:
4077 elif instance.disk_template == constants.DT_DRBD8:
4078 if self.op.remote_node is None:
4079 fn = self._ExecD8DiskOnly
4081 fn = self._ExecD8Secondary
4083 raise errors.ProgrammerError("Unhandled disk replacement case")
4084 return fn(feedback_fn)
4087 class LUQueryInstanceData(NoHooksLU):
4088 """Query runtime instance data.
4091 _OP_REQP = ["instances"]
4093 def CheckPrereq(self):
4094 """Check prerequisites.
4096 This only checks the optional instance list against the existing names.
4099 if not isinstance(self.op.instances, list):
4100 raise errors.OpPrereqError("Invalid argument type 'instances'")
4101 if self.op.instances:
4102 self.wanted_instances = []
4103 names = self.op.instances
4105 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4106 if instance is None:
4107 raise errors.OpPrereqError("No such instance name '%s'" % name)
4108 self.wanted_instances.append(instance)
4110 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4111 in self.cfg.GetInstanceList()]
4115 def _ComputeDiskStatus(self, instance, snode, dev):
4116 """Compute block device status.
4119 self.cfg.SetDiskID(dev, instance.primary_node)
4120 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4121 if dev.dev_type in constants.LDS_DRBD:
4122 # we change the snode then (otherwise we use the one passed in)
4123 if dev.logical_id[0] == instance.primary_node:
4124 snode = dev.logical_id[1]
4126 snode = dev.logical_id[0]
4129 self.cfg.SetDiskID(dev, snode)
4130 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4135 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4136 for child in dev.children]
4141 "iv_name": dev.iv_name,
4142 "dev_type": dev.dev_type,
4143 "logical_id": dev.logical_id,
4144 "physical_id": dev.physical_id,
4145 "pstatus": dev_pstatus,
4146 "sstatus": dev_sstatus,
4147 "children": dev_children,
4152 def Exec(self, feedback_fn):
4153 """Gather and return data"""
4155 for instance in self.wanted_instances:
4156 remote_info = rpc.call_instance_info(instance.primary_node,
4158 if remote_info and "state" in remote_info:
4161 remote_state = "down"
4162 if instance.status == "down":
4163 config_state = "down"
4167 disks = [self._ComputeDiskStatus(instance, None, device)
4168 for device in instance.disks]
4171 "name": instance.name,
4172 "config_state": config_state,
4173 "run_state": remote_state,
4174 "pnode": instance.primary_node,
4175 "snodes": instance.secondary_nodes,
4177 "memory": instance.memory,
4178 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4180 "network_port": instance.network_port,
4181 "vcpus": instance.vcpus,
4182 "kernel_path": instance.kernel_path,
4183 "initrd_path": instance.initrd_path,
4184 "hvm_boot_order": instance.hvm_boot_order,
4187 result[instance.name] = idict
4192 class LUSetInstanceParms(LogicalUnit):
4193 """Modifies an instances's parameters.
4196 HPATH = "instance-modify"
4197 HTYPE = constants.HTYPE_INSTANCE
4198 _OP_REQP = ["instance_name"]
4200 def BuildHooksEnv(self):
4203 This runs on the master, primary and secondaries.
4208 args['memory'] = self.mem
4210 args['vcpus'] = self.vcpus
4211 if self.do_ip or self.do_bridge or self.mac:
4215 ip = self.instance.nics[0].ip
4217 bridge = self.bridge
4219 bridge = self.instance.nics[0].bridge
4223 mac = self.instance.nics[0].mac
4224 args['nics'] = [(ip, bridge, mac)]
4225 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4226 nl = [self.sstore.GetMasterNode(),
4227 self.instance.primary_node] + list(self.instance.secondary_nodes)
4230 def CheckPrereq(self):
4231 """Check prerequisites.
4233 This only checks the instance list against the existing names.
4236 self.mem = getattr(self.op, "mem", None)
4237 self.vcpus = getattr(self.op, "vcpus", None)
4238 self.ip = getattr(self.op, "ip", None)
4239 self.mac = getattr(self.op, "mac", None)
4240 self.bridge = getattr(self.op, "bridge", None)
4241 self.kernel_path = getattr(self.op, "kernel_path", None)
4242 self.initrd_path = getattr(self.op, "initrd_path", None)
4243 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4244 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4245 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4246 if all_parms.count(None) == len(all_parms):
4247 raise errors.OpPrereqError("No changes submitted")
4248 if self.mem is not None:
4250 self.mem = int(self.mem)
4251 except ValueError, err:
4252 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4253 if self.vcpus is not None:
4255 self.vcpus = int(self.vcpus)
4256 except ValueError, err:
4257 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4258 if self.ip is not None:
4260 if self.ip.lower() == "none":
4263 if not utils.IsValidIP(self.ip):
4264 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4267 self.do_bridge = (self.bridge is not None)
4268 if self.mac is not None:
4269 if self.cfg.IsMacInUse(self.mac):
4270 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4272 if not utils.IsValidMac(self.mac):
4273 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4275 if self.kernel_path is not None:
4276 self.do_kernel_path = True
4277 if self.kernel_path == constants.VALUE_NONE:
4278 raise errors.OpPrereqError("Can't set instance to no kernel")
4280 if self.kernel_path != constants.VALUE_DEFAULT:
4281 if not os.path.isabs(self.kernel_path):
4282 raise errors.OpPrereqError("The kernel path must be an absolute"
4285 self.do_kernel_path = False
4287 if self.initrd_path is not None:
4288 self.do_initrd_path = True
4289 if self.initrd_path not in (constants.VALUE_NONE,
4290 constants.VALUE_DEFAULT):
4291 if not os.path.isabs(self.initrd_path):
4292 raise errors.OpPrereqError("The initrd path must be an absolute"
4295 self.do_initrd_path = False
4297 # boot order verification
4298 if self.hvm_boot_order is not None:
4299 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4300 if len(self.hvm_boot_order.strip("acdn")) != 0:
4301 raise errors.OpPrereqError("invalid boot order specified,"
4302 " must be one or more of [acdn]"
4305 instance = self.cfg.GetInstanceInfo(
4306 self.cfg.ExpandInstanceName(self.op.instance_name))
4307 if instance is None:
4308 raise errors.OpPrereqError("No such instance name '%s'" %
4309 self.op.instance_name)
4310 self.op.instance_name = instance.name
4311 self.instance = instance
4314 def Exec(self, feedback_fn):
4315 """Modifies an instance.
4317 All parameters take effect only at the next restart of the instance.
4320 instance = self.instance
4322 instance.memory = self.mem
4323 result.append(("mem", self.mem))
4325 instance.vcpus = self.vcpus
4326 result.append(("vcpus", self.vcpus))
4328 instance.nics[0].ip = self.ip
4329 result.append(("ip", self.ip))
4331 instance.nics[0].bridge = self.bridge
4332 result.append(("bridge", self.bridge))
4334 instance.nics[0].mac = self.mac
4335 result.append(("mac", self.mac))
4336 if self.do_kernel_path:
4337 instance.kernel_path = self.kernel_path
4338 result.append(("kernel_path", self.kernel_path))
4339 if self.do_initrd_path:
4340 instance.initrd_path = self.initrd_path
4341 result.append(("initrd_path", self.initrd_path))
4342 if self.hvm_boot_order:
4343 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4344 instance.hvm_boot_order = None
4346 instance.hvm_boot_order = self.hvm_boot_order
4347 result.append(("hvm_boot_order", self.hvm_boot_order))
4349 self.cfg.AddInstance(instance)
4354 class LUQueryExports(NoHooksLU):
4355 """Query the exports list
4360 def CheckPrereq(self):
4361 """Check that the nodelist contains only existing nodes.
4364 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4366 def Exec(self, feedback_fn):
4367 """Compute the list of all the exported system images.
4370 a dictionary with the structure node->(export-list)
4371 where export-list is a list of the instances exported on
4375 return rpc.call_export_list(self.nodes)
4378 class LUExportInstance(LogicalUnit):
4379 """Export an instance to an image in the cluster.
4382 HPATH = "instance-export"
4383 HTYPE = constants.HTYPE_INSTANCE
4384 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4386 def BuildHooksEnv(self):
4389 This will run on the master, primary node and target node.
4393 "EXPORT_NODE": self.op.target_node,
4394 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4396 env.update(_BuildInstanceHookEnvByObject(self.instance))
4397 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4398 self.op.target_node]
4401 def CheckPrereq(self):
4402 """Check prerequisites.
4404 This checks that the instance name is a valid one.
4407 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4408 self.instance = self.cfg.GetInstanceInfo(instance_name)
4409 if self.instance is None:
4410 raise errors.OpPrereqError("Instance '%s' not found" %
4411 self.op.instance_name)
4414 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4415 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4417 if self.dst_node is None:
4418 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4419 self.op.target_node)
4420 self.op.target_node = self.dst_node.name
4422 def Exec(self, feedback_fn):
4423 """Export an instance to an image in the cluster.
4426 instance = self.instance
4427 dst_node = self.dst_node
4428 src_node = instance.primary_node
4429 if self.op.shutdown:
4430 # shutdown the instance, but not the disks
4431 if not rpc.call_instance_shutdown(src_node, instance):
4432 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4433 (instance.name, source_node))
4435 vgname = self.cfg.GetVGName()
4440 for disk in instance.disks:
4441 if disk.iv_name == "sda":
4442 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4443 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4445 if not new_dev_name:
4446 logger.Error("could not snapshot block device %s on node %s" %
4447 (disk.logical_id[1], src_node))
4449 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4450 logical_id=(vgname, new_dev_name),
4451 physical_id=(vgname, new_dev_name),
4452 iv_name=disk.iv_name)
4453 snap_disks.append(new_dev)
4456 if self.op.shutdown and instance.status == "up":
4457 if not rpc.call_instance_start(src_node, instance, None):
4458 _ShutdownInstanceDisks(instance, self.cfg)
4459 raise errors.OpExecError("Could not start instance")
4461 # TODO: check for size
4463 for dev in snap_disks:
4464 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4466 logger.Error("could not export block device %s from node"
4468 (dev.logical_id[1], src_node, dst_node.name))
4469 if not rpc.call_blockdev_remove(src_node, dev):
4470 logger.Error("could not remove snapshot block device %s from"
4471 " node %s" % (dev.logical_id[1], src_node))
4473 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4474 logger.Error("could not finalize export for instance %s on node %s" %
4475 (instance.name, dst_node.name))
4477 nodelist = self.cfg.GetNodeList()
4478 nodelist.remove(dst_node.name)
4480 # on one-node clusters nodelist will be empty after the removal
4481 # if we proceed the backup would be removed because OpQueryExports
4482 # substitutes an empty list with the full cluster node list.
4484 op = opcodes.OpQueryExports(nodes=nodelist)
4485 exportlist = self.proc.ChainOpCode(op)
4486 for node in exportlist:
4487 if instance.name in exportlist[node]:
4488 if not rpc.call_export_remove(node, instance.name):
4489 logger.Error("could not remove older export for instance %s"
4490 " on node %s" % (instance.name, node))
4493 class TagsLU(NoHooksLU):
4496 This is an abstract class which is the parent of all the other tags LUs.
4499 def CheckPrereq(self):
4500 """Check prerequisites.
4503 if self.op.kind == constants.TAG_CLUSTER:
4504 self.target = self.cfg.GetClusterInfo()
4505 elif self.op.kind == constants.TAG_NODE:
4506 name = self.cfg.ExpandNodeName(self.op.name)
4508 raise errors.OpPrereqError("Invalid node name (%s)" %
4511 self.target = self.cfg.GetNodeInfo(name)
4512 elif self.op.kind == constants.TAG_INSTANCE:
4513 name = self.cfg.ExpandInstanceName(self.op.name)
4515 raise errors.OpPrereqError("Invalid instance name (%s)" %
4518 self.target = self.cfg.GetInstanceInfo(name)
4520 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4524 class LUGetTags(TagsLU):
4525 """Returns the tags of a given object.
4528 _OP_REQP = ["kind", "name"]
4530 def Exec(self, feedback_fn):
4531 """Returns the tag list.
4534 return self.target.GetTags()
4537 class LUSearchTags(NoHooksLU):
4538 """Searches the tags for a given pattern.
4541 _OP_REQP = ["pattern"]
4543 def CheckPrereq(self):
4544 """Check prerequisites.
4546 This checks the pattern passed for validity by compiling it.
4550 self.re = re.compile(self.op.pattern)
4551 except re.error, err:
4552 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4553 (self.op.pattern, err))
4555 def Exec(self, feedback_fn):
4556 """Returns the tag list.
4560 tgts = [("/cluster", cfg.GetClusterInfo())]
4561 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4562 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4563 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4564 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4566 for path, target in tgts:
4567 for tag in target.GetTags():
4568 if self.re.search(tag):
4569 results.append((path, tag))
4573 class LUAddTags(TagsLU):
4574 """Sets a tag on a given object.
4577 _OP_REQP = ["kind", "name", "tags"]
4579 def CheckPrereq(self):
4580 """Check prerequisites.
4582 This checks the type and length of the tag name and value.
4585 TagsLU.CheckPrereq(self)
4586 for tag in self.op.tags:
4587 objects.TaggableObject.ValidateTag(tag)
4589 def Exec(self, feedback_fn):
4594 for tag in self.op.tags:
4595 self.target.AddTag(tag)
4596 except errors.TagError, err:
4597 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4599 self.cfg.Update(self.target)
4600 except errors.ConfigurationError:
4601 raise errors.OpRetryError("There has been a modification to the"
4602 " config file and the operation has been"
4603 " aborted. Please retry.")
4606 class LUDelTags(TagsLU):
4607 """Delete a list of tags from a given object.
4610 _OP_REQP = ["kind", "name", "tags"]
4612 def CheckPrereq(self):
4613 """Check prerequisites.
4615 This checks that we have the given tag.
4618 TagsLU.CheckPrereq(self)
4619 for tag in self.op.tags:
4620 objects.TaggableObject.ValidateTag(tag)
4621 del_tags = frozenset(self.op.tags)
4622 cur_tags = self.target.GetTags()
4623 if not del_tags <= cur_tags:
4624 diff_tags = del_tags - cur_tags
4625 diff_names = ["'%s'" % tag for tag in diff_tags]
4627 raise errors.OpPrereqError("Tag(s) %s not found" %
4628 (",".join(diff_names)))
4630 def Exec(self, feedback_fn):
4631 """Remove the tag from the object.
4634 for tag in self.op.tags:
4635 self.target.RemoveTag(tag)
4637 self.cfg.Update(self.target)
4638 except errors.ConfigurationError:
4639 raise errors.OpRetryError("There has been a modification to the"
4640 " config file and the operation has been"
4641 " aborted. Please retry.")
4643 class LUTestDelay(NoHooksLU):
4644 """Sleep for a specified amount of time.
4646 This LU sleeps on the master and/or nodes for a specified amoutn of
4650 _OP_REQP = ["duration", "on_master", "on_nodes"]
4652 def CheckPrereq(self):
4653 """Check prerequisites.
4655 This checks that we have a good list of nodes and/or the duration
4660 if self.op.on_nodes:
4661 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4663 def Exec(self, feedback_fn):
4664 """Do the actual sleep.
4667 if self.op.on_master:
4668 if not utils.TestDelay(self.op.duration):
4669 raise errors.OpExecError("Error during master delay test")
4670 if self.op.on_nodes:
4671 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4673 raise errors.OpExecError("Complete failure from rpc call")
4674 for node, node_result in result.items():
4676 raise errors.OpExecError("Failure during rpc call to node %s,"
4677 " result: %s" % (node, node_result))
4680 def _AllocatorGetClusterData(cfg, sstore):
4681 """Compute the generic allocator input data.
4683 This is the data that is independent of the actual operation.
4689 "cluster_name": sstore.GetClusterName(),
4690 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4691 # we don't have job IDs
4696 node_list = cfg.GetNodeList()
4697 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4698 for nname in node_list:
4699 ninfo = cfg.GetNodeInfo(nname)
4700 if nname not in node_data or not isinstance(node_data[nname], dict):
4701 raise errors.OpExecError("Can't get data for node %s" % nname)
4702 remote_info = node_data[nname]
4703 for attr in ['memory_total', 'memory_free',
4704 'vg_size', 'vg_free']:
4705 if attr not in remote_info:
4706 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4709 int(remote_info[attr])
4710 except ValueError, err:
4711 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4712 " %s" % (nname, attr, str(err)))
4714 "tags": list(ninfo.GetTags()),
4715 "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4716 "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4717 "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4718 "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4719 "primary_ip": ninfo.primary_ip,
4720 "secondary_ip": ninfo.secondary_ip,
4722 node_results[nname] = pnr
4723 data["nodes"] = node_results
4727 i_list = cfg.GetInstanceList()
4728 for iname in i_list:
4729 iinfo = cfg.GetInstanceInfo(iname)
4730 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4731 for n in iinfo.nics]
4733 "tags": list(iinfo.GetTags()),
4734 "should_run": iinfo.status == "up",
4735 "vcpus": iinfo.vcpus,
4736 "memory": iinfo.memory,
4738 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4740 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4741 "disk_template": iinfo.disk_template,
4743 instance_data[iname] = pir
4745 data["instances"] = instance_data
4750 def _AllocatorAddNewInstance(data, op):
4751 """Add new instance data to allocator structure.
4753 This in combination with _AllocatorGetClusterData will create the
4754 correct structure needed as input for the allocator.
4756 The checks for the completeness of the opcode must have already been
4763 "disk_template": op.disk_template,
4767 "memory": op.mem_size,
4771 data["request"] = request
4774 def _AllocatorAddRelocateInstance(data, op):
4775 """Add relocate instance data to allocator structure.
4777 This in combination with _AllocatorGetClusterData will create the
4778 correct structure needed as input for the allocator.
4780 The checks for the completeness of the opcode must have already been
4785 "type": "replace_secondary",
4788 data["request"] = request
4791 class LUTestAllocator(NoHooksLU):
4792 """Run allocator tests.
4794 This LU runs the allocator tests
4797 _OP_REQP = ["direction", "mode", "name"]
4799 def CheckPrereq(self):
4800 """Check prerequisites.
4802 This checks the opcode parameters depending on the director and mode test.
4805 if self.op.mode == constants.ALF_MODE_ALLOC:
4806 for attr in ["name", "mem_size", "disks", "disk_template",
4807 "os", "tags", "nics", "vcpus"]:
4808 if not hasattr(self.op, attr):
4809 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4811 iname = self.cfg.ExpandInstanceName(self.op.name)
4812 if iname is not None:
4813 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4815 if not isinstance(self.op.nics, list):
4816 raise errors.OpPrereqError("Invalid parameter 'nics'")
4817 for row in self.op.nics:
4818 if (not isinstance(row, dict) or
4821 "bridge" not in row):
4822 raise errors.OpPrereqError("Invalid contents of the"
4823 " 'nics' parameter")
4824 if not isinstance(self.op.disks, list):
4825 raise errors.OpPrereqError("Invalid parameter 'disks'")
4826 for row in self.op.disks:
4827 if (not isinstance(row, dict) or
4828 "size" not in row or
4829 not isinstance(row["size"], int) or
4830 "mode" not in row or
4831 row["mode"] not in ['r', 'w']):
4832 raise errors.OpPrereqError("Invalid contents of the"
4833 " 'disks' parameter")
4834 elif self.op.mode == constants.ALF_MODE_RELOC:
4835 if not hasattr(self.op, "name"):
4836 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4837 fname = self.cfg.ExpandInstanceName(self.op.name)
4839 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4841 self.op.name = fname
4843 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4846 if self.op.direction == constants.ALF_DIR_OUT:
4847 if not hasattr(self.op, "allocator"):
4848 raise errors.OpPrereqError("Missing allocator name")
4849 raise errors.OpPrereqError("Allocator out mode not supported yet")
4850 elif self.op.direction != constants.ALF_DIR_IN:
4851 raise errors.OpPrereqError("Wrong allocator test '%s'" %
4854 def Exec(self, feedback_fn):
4855 """Run the allocator test.
4858 data = _AllocatorGetClusterData(self.cfg, self.sstore)
4859 if self.op.mode == constants.ALF_MODE_ALLOC:
4860 _AllocatorAddNewInstance(data, self.op)
4862 _AllocatorAddRelocateInstance(data, self.op)
4864 if _JSON_INDENT is None:
4865 text = simplejson.dumps(data)
4867 text = simplejson.dumps(data, indent=_JSON_INDENT)