4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
48 # Check whether the simplejson module supports indentation
51 simplejson.dumps(1, indent=_JSON_INDENT)
56 class LogicalUnit(object):
57 """Logical Unit base class.
59 Subclasses must follow these rules:
60 - implement CheckPrereq which also fills in the opcode instance
61 with all the fields (even if as None)
63 - implement BuildHooksEnv
64 - redefine HPATH and HTYPE
65 - optionally redefine their run requirements (REQ_CLUSTER,
66 REQ_MASTER); note that all commands require root permissions
75 def __init__(self, processor, op, cfg, sstore):
76 """Constructor for LogicalUnit.
78 This needs to be overriden in derived classes in order to check op
86 for attr_name in self._OP_REQP:
87 attr_val = getattr(op, attr_name, None)
89 raise errors.OpPrereqError("Required parameter '%s' missing" %
92 if not cfg.IsCluster():
93 raise errors.OpPrereqError("Cluster not initialized yet,"
94 " use 'gnt-cluster init' first.")
96 master = sstore.GetMasterNode()
97 if master != utils.HostInfo().name:
98 raise errors.OpPrereqError("Commands must be run on the master"
101 def CheckPrereq(self):
102 """Check prerequisites for this LU.
104 This method should check that the prerequisites for the execution
105 of this LU are fulfilled. It can do internode communication, but
106 it should be idempotent - no cluster or system changes are
109 The method should raise errors.OpPrereqError in case something is
110 not fulfilled. Its return value is ignored.
112 This method should also update all the parameters of the opcode to
113 their canonical form; e.g. a short node name must be fully
114 expanded after this method has successfully completed (so that
115 hooks, logging, etc. work correctly).
118 raise NotImplementedError
120 def Exec(self, feedback_fn):
123 This method should implement the actual work. It should raise
124 errors.OpExecError for failures that are somewhat dealt with in
128 raise NotImplementedError
130 def BuildHooksEnv(self):
131 """Build hooks environment for this LU.
133 This method should return a three-node tuple consisting of: a dict
134 containing the environment that will be used for running the
135 specific hook for this LU, a list of node names on which the hook
136 should run before the execution, and a list of node names on which
137 the hook should run after the execution.
139 The keys of the dict must not have 'GANETI_' prefixed as this will
140 be handled in the hooks runner. Also note additional keys will be
141 added by the hooks runner. If the LU doesn't define any
142 environment, an empty dict (and not None) should be returned.
144 As for the node lists, the master should not be included in the
145 them, as it will be added by the hooks runner in case this LU
146 requires a cluster to run on (otherwise we don't have a node
147 list). No nodes should be returned as an empty list (and not
150 Note that if the HPATH for a LU class is None, this function will
154 raise NotImplementedError
157 class NoHooksLU(LogicalUnit):
158 """Simple LU which runs no hooks.
160 This LU is intended as a parent for other LogicalUnits which will
161 run no hooks, in order to reduce duplicate code.
167 def BuildHooksEnv(self):
170 This is a no-op, since we don't run hooks.
176 def _AddHostToEtcHosts(hostname):
177 """Wrapper around utils.SetEtcHostsEntry.
180 hi = utils.HostInfo(name=hostname)
181 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
184 def _RemoveHostFromEtcHosts(hostname):
185 """Wrapper around utils.RemoveEtcHostsEntry.
188 hi = utils.HostInfo(name=hostname)
189 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
190 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
193 def _GetWantedNodes(lu, nodes):
194 """Returns list of checked and expanded node names.
197 nodes: List of nodes (strings) or None for all
200 if not isinstance(nodes, list):
201 raise errors.OpPrereqError("Invalid argument type 'nodes'")
207 node = lu.cfg.ExpandNodeName(name)
209 raise errors.OpPrereqError("No such node name '%s'" % name)
213 wanted = lu.cfg.GetNodeList()
214 return utils.NiceSort(wanted)
217 def _GetWantedInstances(lu, instances):
218 """Returns list of checked and expanded instance names.
221 instances: List of instances (strings) or None for all
224 if not isinstance(instances, list):
225 raise errors.OpPrereqError("Invalid argument type 'instances'")
230 for name in instances:
231 instance = lu.cfg.ExpandInstanceName(name)
233 raise errors.OpPrereqError("No such instance name '%s'" % name)
234 wanted.append(instance)
237 wanted = lu.cfg.GetInstanceList()
238 return utils.NiceSort(wanted)
241 def _CheckOutputFields(static, dynamic, selected):
242 """Checks whether all selected fields are valid.
245 static: Static fields
246 dynamic: Dynamic fields
249 static_fields = frozenset(static)
250 dynamic_fields = frozenset(dynamic)
252 all_fields = static_fields | dynamic_fields
254 if not all_fields.issuperset(selected):
255 raise errors.OpPrereqError("Unknown output fields selected: %s"
256 % ",".join(frozenset(selected).
257 difference(all_fields)))
260 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
261 memory, vcpus, nics):
262 """Builds instance related env variables for hooks from single variables.
265 secondary_nodes: List of secondary nodes as strings
269 "INSTANCE_NAME": name,
270 "INSTANCE_PRIMARY": primary_node,
271 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
272 "INSTANCE_OS_TYPE": os_type,
273 "INSTANCE_STATUS": status,
274 "INSTANCE_MEMORY": memory,
275 "INSTANCE_VCPUS": vcpus,
279 nic_count = len(nics)
280 for idx, (ip, bridge, mac) in enumerate(nics):
283 env["INSTANCE_NIC%d_IP" % idx] = ip
284 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
285 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289 env["INSTANCE_NIC_COUNT"] = nic_count
294 def _BuildInstanceHookEnvByObject(instance, override=None):
295 """Builds instance related env variables for hooks from an object.
298 instance: objects.Instance object of instance
299 override: dict of values to override
302 'name': instance.name,
303 'primary_node': instance.primary_node,
304 'secondary_nodes': instance.secondary_nodes,
305 'os_type': instance.os,
306 'status': instance.os,
307 'memory': instance.memory,
308 'vcpus': instance.vcpus,
309 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
312 args.update(override)
313 return _BuildInstanceHookEnv(**args)
316 def _UpdateKnownHosts(fullnode, ip, pubkey):
317 """Ensure a node has a correct known_hosts entry.
320 fullnode - Fully qualified domain name of host. (str)
321 ip - IPv4 address of host (str)
322 pubkey - the public key of the cluster
325 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
328 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
337 logger.Debug('read %s' % (repr(rawline),))
339 parts = rawline.rstrip('\r\n').split()
341 # Ignore unwanted lines
342 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
343 fields = parts[0].split(',')
348 for spec in [ ip, fullnode ]:
349 if spec not in fields:
354 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
355 if haveall and key == pubkey:
357 save_lines.append(rawline)
358 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
361 if havesome and (not haveall or key != pubkey):
363 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
366 save_lines.append(rawline)
369 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
370 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
373 save_lines = save_lines + add_lines
375 # Write a new file and replace old.
376 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
378 newfile = os.fdopen(fd, 'w')
380 newfile.write(''.join(save_lines))
383 logger.Debug("Wrote new known_hosts.")
384 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
387 # Simply appending a new line will do the trick.
389 for add in add_lines:
395 def _HasValidVG(vglist, vgname):
396 """Checks if the volume group list is valid.
398 A non-None return value means there's an error, and the return value
399 is the error message.
402 vgsize = vglist.get(vgname, None)
404 return "volume group '%s' missing" % vgname
406 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
411 def _InitSSHSetup(node):
412 """Setup the SSH configuration for the cluster.
415 This generates a dsa keypair for root, adds the pub key to the
416 permitted hosts and adds the hostkey to its own known hosts.
419 node: the name of this host as a fqdn
422 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
424 for name in priv_key, pub_key:
425 if os.path.exists(name):
426 utils.CreateBackup(name)
427 utils.RemoveFile(name)
429 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
433 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
436 f = open(pub_key, 'r')
438 utils.AddAuthorizedKey(auth_keys, f.read(8192))
443 def _InitGanetiServerSetup(ss):
444 """Setup the necessary configuration for the initial node daemon.
446 This creates the nodepass file containing the shared password for
447 the cluster and also generates the SSL certificate.
450 # Create pseudo random password
451 randpass = sha.new(os.urandom(64)).hexdigest()
452 # and write it into sstore
453 ss.SetKey(ss.SS_NODED_PASS, randpass)
455 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
456 "-days", str(365*5), "-nodes", "-x509",
457 "-keyout", constants.SSL_CERT_FILE,
458 "-out", constants.SSL_CERT_FILE, "-batch"])
460 raise errors.OpExecError("could not generate server ssl cert, command"
461 " %s had exitcode %s and error message %s" %
462 (result.cmd, result.exit_code, result.output))
464 os.chmod(constants.SSL_CERT_FILE, 0400)
466 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
469 raise errors.OpExecError("Could not start the node daemon, command %s"
470 " had exitcode %s and error %s" %
471 (result.cmd, result.exit_code, result.output))
474 def _CheckInstanceBridgesExist(instance):
475 """Check that the brigdes needed by an instance exist.
478 # check bridges existance
479 brlist = [nic.bridge for nic in instance.nics]
480 if not rpc.call_bridges_exist(instance.primary_node, brlist):
481 raise errors.OpPrereqError("one or more target bridges %s does not"
482 " exist on destination node '%s'" %
483 (brlist, instance.primary_node))
486 class LUInitCluster(LogicalUnit):
487 """Initialise the cluster.
490 HPATH = "cluster-init"
491 HTYPE = constants.HTYPE_CLUSTER
492 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
493 "def_bridge", "master_netdev"]
496 def BuildHooksEnv(self):
499 Notes: Since we don't require a cluster, we must manually add
500 ourselves in the post-run node list.
503 env = {"OP_TARGET": self.op.cluster_name}
504 return env, [], [self.hostname.name]
506 def CheckPrereq(self):
507 """Verify that the passed name is a valid one.
510 if config.ConfigWriter.IsCluster():
511 raise errors.OpPrereqError("Cluster is already initialised")
513 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
514 if not os.path.exists(constants.VNC_PASSWORD_FILE):
515 raise errors.OpPrereqError("Please prepare the cluster VNC"
517 constants.VNC_PASSWORD_FILE)
519 self.hostname = hostname = utils.HostInfo()
521 if hostname.ip.startswith("127."):
522 raise errors.OpPrereqError("This host's IP resolves to the private"
523 " range (%s). Please fix DNS or %s." %
524 (hostname.ip, constants.ETC_HOSTS))
526 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
527 source=constants.LOCALHOST_IP_ADDRESS):
528 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
529 " to %s,\nbut this ip address does not"
530 " belong to this host."
531 " Aborting." % hostname.ip)
533 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
535 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
537 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
539 secondary_ip = getattr(self.op, "secondary_ip", None)
540 if secondary_ip and not utils.IsValidIP(secondary_ip):
541 raise errors.OpPrereqError("Invalid secondary ip given")
543 secondary_ip != hostname.ip and
544 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
545 source=constants.LOCALHOST_IP_ADDRESS))):
546 raise errors.OpPrereqError("You gave %s as secondary IP,"
547 " but it does not belong to this host." %
549 self.secondary_ip = secondary_ip
551 # checks presence of the volume group given
552 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
555 raise errors.OpPrereqError("Error: %s" % vgstatus)
557 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
559 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
562 if self.op.hypervisor_type not in constants.HYPER_TYPES:
563 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
564 self.op.hypervisor_type)
566 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
568 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
569 (self.op.master_netdev,
570 result.output.strip()))
572 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
573 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
574 raise errors.OpPrereqError("Init.d script '%s' missing or not"
575 " executable." % constants.NODE_INITD_SCRIPT)
577 def Exec(self, feedback_fn):
578 """Initialize the cluster.
581 clustername = self.clustername
582 hostname = self.hostname
584 # set up the simple store
585 self.sstore = ss = ssconf.SimpleStore()
586 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
587 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
588 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
589 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
590 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
592 # set up the inter-node password and certificate
593 _InitGanetiServerSetup(ss)
595 # start the master ip
596 rpc.call_node_start_master(hostname.name)
598 # set up ssh config and /etc/hosts
599 f = open(constants.SSH_HOST_RSA_PUB, 'r')
604 sshkey = sshline.split(" ")[1]
606 _AddHostToEtcHosts(hostname.name)
608 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
610 _InitSSHSetup(hostname.name)
612 # init of cluster config file
613 self.cfg = cfgw = config.ConfigWriter()
614 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
615 sshkey, self.op.mac_prefix,
616 self.op.vg_name, self.op.def_bridge)
619 class LUDestroyCluster(NoHooksLU):
620 """Logical unit for destroying the cluster.
625 def CheckPrereq(self):
626 """Check prerequisites.
628 This checks whether the cluster is empty.
630 Any errors are signalled by raising errors.OpPrereqError.
633 master = self.sstore.GetMasterNode()
635 nodelist = self.cfg.GetNodeList()
636 if len(nodelist) != 1 or nodelist[0] != master:
637 raise errors.OpPrereqError("There are still %d node(s) in"
638 " this cluster." % (len(nodelist) - 1))
639 instancelist = self.cfg.GetInstanceList()
641 raise errors.OpPrereqError("There are still %d instance(s) in"
642 " this cluster." % len(instancelist))
644 def Exec(self, feedback_fn):
645 """Destroys the cluster.
648 master = self.sstore.GetMasterNode()
649 if not rpc.call_node_stop_master(master):
650 raise errors.OpExecError("Could not disable the master role")
651 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
652 utils.CreateBackup(priv_key)
653 utils.CreateBackup(pub_key)
654 rpc.call_node_leave_cluster(master)
657 class LUVerifyCluster(NoHooksLU):
658 """Verifies the cluster status.
663 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
664 remote_version, feedback_fn):
665 """Run multiple tests against a node.
668 - compares ganeti version
669 - checks vg existance and size > 20G
670 - checks config file checksum
671 - checks ssh to other nodes
674 node: name of the node to check
675 file_list: required list of files
676 local_cksum: dictionary of local files and their checksums
679 # compares ganeti version
680 local_version = constants.PROTOCOL_VERSION
681 if not remote_version:
682 feedback_fn(" - ERROR: connection to %s failed" % (node))
685 if local_version != remote_version:
686 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
687 (local_version, node, remote_version))
690 # checks vg existance and size > 20G
694 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
698 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
700 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
703 # checks config file checksum
706 if 'filelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned file checksum data")
710 remote_cksum = node_result['filelist']
711 for file_name in file_list:
712 if file_name not in remote_cksum:
714 feedback_fn(" - ERROR: file '%s' missing" % file_name)
715 elif remote_cksum[file_name] != local_cksum[file_name]:
717 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
719 if 'nodelist' not in node_result:
721 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
723 if node_result['nodelist']:
725 for node in node_result['nodelist']:
726 feedback_fn(" - ERROR: communication with node '%s': %s" %
727 (node, node_result['nodelist'][node]))
728 hyp_result = node_result.get('hypervisor', None)
729 if hyp_result is not None:
730 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
733 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
734 node_instance, feedback_fn):
735 """Verify an instance.
737 This function checks to see if the required block devices are
738 available on the instance's node.
743 node_current = instanceconfig.primary_node
746 instanceconfig.MapLVsByNode(node_vol_should)
748 for node in node_vol_should:
749 for volume in node_vol_should[node]:
750 if node not in node_vol_is or volume not in node_vol_is[node]:
751 feedback_fn(" - ERROR: volume %s missing on node %s" %
755 if not instanceconfig.status == 'down':
756 if (node_current not in node_instance or
757 not instance in node_instance[node_current]):
758 feedback_fn(" - ERROR: instance %s not running on node %s" %
759 (instance, node_current))
762 for node in node_instance:
763 if (not node == node_current):
764 if instance in node_instance[node]:
765 feedback_fn(" - ERROR: instance %s should not run on node %s" %
771 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
772 """Verify if there are any unknown volumes in the cluster.
774 The .os, .swap and backup volumes are ignored. All other volumes are
780 for node in node_vol_is:
781 for volume in node_vol_is[node]:
782 if node not in node_vol_should or volume not in node_vol_should[node]:
783 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
788 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
789 """Verify the list of running instances.
791 This checks what instances are running but unknown to the cluster.
795 for node in node_instance:
796 for runninginstance in node_instance[node]:
797 if runninginstance not in instancelist:
798 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
799 (runninginstance, node))
803 def CheckPrereq(self):
804 """Check prerequisites.
806 This has no prerequisites.
811 def Exec(self, feedback_fn):
812 """Verify integrity of cluster, performing various test on nodes.
816 feedback_fn("* Verifying global settings")
817 for msg in self.cfg.VerifyConfig():
818 feedback_fn(" - ERROR: %s" % msg)
820 vg_name = self.cfg.GetVGName()
821 nodelist = utils.NiceSort(self.cfg.GetNodeList())
822 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
823 i_non_redundant = [] # Non redundant instances
829 # FIXME: verify OS list
831 file_names = list(self.sstore.GetFileList())
832 file_names.append(constants.SSL_CERT_FILE)
833 file_names.append(constants.CLUSTER_CONF_FILE)
834 local_checksums = utils.FingerprintFiles(file_names)
836 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
837 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
838 all_instanceinfo = rpc.call_instance_list(nodelist)
839 all_vglist = rpc.call_vg_list(nodelist)
840 node_verify_param = {
841 'filelist': file_names,
842 'nodelist': nodelist,
845 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
846 all_rversion = rpc.call_version(nodelist)
847 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
849 for node in nodelist:
850 feedback_fn("* Verifying node %s" % node)
851 result = self._VerifyNode(node, file_names, local_checksums,
852 all_vglist[node], all_nvinfo[node],
853 all_rversion[node], feedback_fn)
857 volumeinfo = all_volumeinfo[node]
859 if isinstance(volumeinfo, basestring):
860 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
861 (node, volumeinfo[-400:].encode('string_escape')))
863 node_volume[node] = {}
864 elif not isinstance(volumeinfo, dict):
865 feedback_fn(" - ERROR: connection to %s failed" % (node,))
869 node_volume[node] = volumeinfo
872 nodeinstance = all_instanceinfo[node]
873 if type(nodeinstance) != list:
874 feedback_fn(" - ERROR: connection to %s failed" % (node,))
878 node_instance[node] = nodeinstance
881 nodeinfo = all_ninfo[node]
882 if not isinstance(nodeinfo, dict):
883 feedback_fn(" - ERROR: connection to %s failed" % (node,))
889 "mfree": int(nodeinfo['memory_free']),
890 "dfree": int(nodeinfo['vg_free']),
893 # dictionary holding all instances this node is secondary for,
894 # grouped by their primary node. Each key is a cluster node, and each
895 # value is a list of instances which have the key as primary and the
896 # current node as secondary. this is handy to calculate N+1 memory
897 # availability if you can only failover from a primary to its
899 "sinst-by-pnode": {},
902 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
908 for instance in instancelist:
909 feedback_fn("* Verifying instance %s" % instance)
910 inst_config = self.cfg.GetInstanceInfo(instance)
911 result = self._VerifyInstance(instance, inst_config, node_volume,
912 node_instance, feedback_fn)
915 inst_config.MapLVsByNode(node_vol_should)
917 instance_cfg[instance] = inst_config
919 pnode = inst_config.primary_node
920 if pnode in node_info:
921 node_info[pnode]['pinst'].append(instance)
923 feedback_fn(" - ERROR: instance %s, connection to primary node"
924 " %s failed" % (instance, pnode))
927 # If the instance is non-redundant we cannot survive losing its primary
928 # node, so we are not N+1 compliant. On the other hand we have no disk
929 # templates with more than one secondary so that situation is not well
931 # FIXME: does not support file-backed instances
932 if len(inst_config.secondary_nodes) == 0:
933 i_non_redundant.append(instance)
934 elif len(inst_config.secondary_nodes) > 1:
935 feedback_fn(" - WARNING: multiple secondaries for instance %s"
938 for snode in inst_config.secondary_nodes:
939 if snode in node_info:
940 node_info[snode]['sinst'].append(instance)
941 if pnode not in node_info[snode]['sinst-by-pnode']:
942 node_info[snode]['sinst-by-pnode'][pnode] = []
943 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
945 feedback_fn(" - ERROR: instance %s, connection to secondary node"
946 " %s failed" % (instance, snode))
948 feedback_fn("* Verifying orphan volumes")
949 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
953 feedback_fn("* Verifying remaining instances")
954 result = self._VerifyOrphanInstances(instancelist, node_instance,
961 class LUVerifyDisks(NoHooksLU):
962 """Verifies the cluster disks status.
967 def CheckPrereq(self):
968 """Check prerequisites.
970 This has no prerequisites.
975 def Exec(self, feedback_fn):
976 """Verify integrity of cluster disks.
979 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
981 vg_name = self.cfg.GetVGName()
982 nodes = utils.NiceSort(self.cfg.GetNodeList())
983 instances = [self.cfg.GetInstanceInfo(name)
984 for name in self.cfg.GetInstanceList()]
987 for inst in instances:
989 if (inst.status != "up" or
990 inst.disk_template not in constants.DTS_NET_MIRROR):
992 inst.MapLVsByNode(inst_lvs)
993 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
994 for node, vol_list in inst_lvs.iteritems():
996 nv_dict[(node, vol)] = inst
1001 node_lvs = rpc.call_volume_list(nodes, vg_name)
1006 lvs = node_lvs[node]
1008 if isinstance(lvs, basestring):
1009 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1010 res_nlvm[node] = lvs
1011 elif not isinstance(lvs, dict):
1012 logger.Info("connection to node %s failed or invalid data returned" %
1014 res_nodes.append(node)
1017 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1018 inst = nv_dict.pop((node, lv_name), None)
1019 if (not lv_online and inst is not None
1020 and inst.name not in res_instances):
1021 res_instances.append(inst.name)
1023 # any leftover items in nv_dict are missing LVs, let's arrange the
1025 for key, inst in nv_dict.iteritems():
1026 if inst.name not in res_missing:
1027 res_missing[inst.name] = []
1028 res_missing[inst.name].append(key)
1033 class LURenameCluster(LogicalUnit):
1034 """Rename the cluster.
1037 HPATH = "cluster-rename"
1038 HTYPE = constants.HTYPE_CLUSTER
1041 def BuildHooksEnv(self):
1046 "OP_TARGET": self.sstore.GetClusterName(),
1047 "NEW_NAME": self.op.name,
1049 mn = self.sstore.GetMasterNode()
1050 return env, [mn], [mn]
1052 def CheckPrereq(self):
1053 """Verify that the passed name is a valid one.
1056 hostname = utils.HostInfo(self.op.name)
1058 new_name = hostname.name
1059 self.ip = new_ip = hostname.ip
1060 old_name = self.sstore.GetClusterName()
1061 old_ip = self.sstore.GetMasterIP()
1062 if new_name == old_name and new_ip == old_ip:
1063 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1064 " cluster has changed")
1065 if new_ip != old_ip:
1066 result = utils.RunCmd(["fping", "-q", new_ip])
1067 if not result.failed:
1068 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1069 " reachable on the network. Aborting." %
1072 self.op.name = new_name
1074 def Exec(self, feedback_fn):
1075 """Rename the cluster.
1078 clustername = self.op.name
1082 # shutdown the master IP
1083 master = ss.GetMasterNode()
1084 if not rpc.call_node_stop_master(master):
1085 raise errors.OpExecError("Could not disable the master role")
1089 ss.SetKey(ss.SS_MASTER_IP, ip)
1090 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1092 # Distribute updated ss config to all nodes
1093 myself = self.cfg.GetNodeInfo(master)
1094 dist_nodes = self.cfg.GetNodeList()
1095 if myself.name in dist_nodes:
1096 dist_nodes.remove(myself.name)
1098 logger.Debug("Copying updated ssconf data to all nodes")
1099 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1100 fname = ss.KeyToFilename(keyname)
1101 result = rpc.call_upload_file(dist_nodes, fname)
1102 for to_node in dist_nodes:
1103 if not result[to_node]:
1104 logger.Error("copy of file %s to node %s failed" %
1107 if not rpc.call_node_start_master(master):
1108 logger.Error("Could not re-enable the master role on the master,"
1109 " please restart manually.")
1112 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1113 """Sleep and poll for an instance's disk to sync.
1116 if not instance.disks:
1120 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1122 node = instance.primary_node
1124 for dev in instance.disks:
1125 cfgw.SetDiskID(dev, node)
1131 cumul_degraded = False
1132 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1134 proc.LogWarning("Can't get any data from node %s" % node)
1137 raise errors.RemoteError("Can't contact node %s for mirror data,"
1138 " aborting." % node)
1142 for i in range(len(rstats)):
1145 proc.LogWarning("Can't compute data for node %s/%s" %
1146 (node, instance.disks[i].iv_name))
1148 # we ignore the ldisk parameter
1149 perc_done, est_time, is_degraded, _ = mstat
1150 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1151 if perc_done is not None:
1153 if est_time is not None:
1154 rem_time = "%d estimated seconds remaining" % est_time
1157 rem_time = "no time estimate"
1158 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1159 (instance.disks[i].iv_name, perc_done, rem_time))
1166 time.sleep(min(60, max_time))
1172 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1173 return not cumul_degraded
1176 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1177 """Check that mirrors are not degraded.
1179 The ldisk parameter, if True, will change the test from the
1180 is_degraded attribute (which represents overall non-ok status for
1181 the device(s)) to the ldisk (representing the local storage status).
1184 cfgw.SetDiskID(dev, node)
1191 if on_primary or dev.AssembleOnSecondary():
1192 rstats = rpc.call_blockdev_find(node, dev)
1194 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1197 result = result and (not rstats[idx])
1199 for child in dev.children:
1200 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1205 class LUDiagnoseOS(NoHooksLU):
1206 """Logical unit for OS diagnose/query.
1211 def CheckPrereq(self):
1212 """Check prerequisites.
1214 This always succeeds, since this is a pure query LU.
1219 def Exec(self, feedback_fn):
1220 """Compute the list of OSes.
1223 node_list = self.cfg.GetNodeList()
1224 node_data = rpc.call_os_diagnose(node_list)
1225 if node_data == False:
1226 raise errors.OpExecError("Can't gather the list of OSes")
1230 class LURemoveNode(LogicalUnit):
1231 """Logical unit for removing a node.
1234 HPATH = "node-remove"
1235 HTYPE = constants.HTYPE_NODE
1236 _OP_REQP = ["node_name"]
1238 def BuildHooksEnv(self):
1241 This doesn't run on the target node in the pre phase as a failed
1242 node would not allows itself to run.
1246 "OP_TARGET": self.op.node_name,
1247 "NODE_NAME": self.op.node_name,
1249 all_nodes = self.cfg.GetNodeList()
1250 all_nodes.remove(self.op.node_name)
1251 return env, all_nodes, all_nodes
1253 def CheckPrereq(self):
1254 """Check prerequisites.
1257 - the node exists in the configuration
1258 - it does not have primary or secondary instances
1259 - it's not the master
1261 Any errors are signalled by raising errors.OpPrereqError.
1264 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1266 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1268 instance_list = self.cfg.GetInstanceList()
1270 masternode = self.sstore.GetMasterNode()
1271 if node.name == masternode:
1272 raise errors.OpPrereqError("Node is the master node,"
1273 " you need to failover first.")
1275 for instance_name in instance_list:
1276 instance = self.cfg.GetInstanceInfo(instance_name)
1277 if node.name == instance.primary_node:
1278 raise errors.OpPrereqError("Instance %s still running on the node,"
1279 " please remove first." % instance_name)
1280 if node.name in instance.secondary_nodes:
1281 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1282 " please remove first." % instance_name)
1283 self.op.node_name = node.name
1286 def Exec(self, feedback_fn):
1287 """Removes the node from the cluster.
1291 logger.Info("stopping the node daemon and removing configs from node %s" %
1294 rpc.call_node_leave_cluster(node.name)
1296 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1298 logger.Info("Removing node %s from config" % node.name)
1300 self.cfg.RemoveNode(node.name)
1302 _RemoveHostFromEtcHosts(node.name)
1305 class LUQueryNodes(NoHooksLU):
1306 """Logical unit for querying nodes.
1309 _OP_REQP = ["output_fields", "names"]
1311 def CheckPrereq(self):
1312 """Check prerequisites.
1314 This checks that the fields required are valid output fields.
1317 self.dynamic_fields = frozenset(["dtotal", "dfree",
1318 "mtotal", "mnode", "mfree",
1321 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1322 "pinst_list", "sinst_list",
1324 dynamic=self.dynamic_fields,
1325 selected=self.op.output_fields)
1327 self.wanted = _GetWantedNodes(self, self.op.names)
1329 def Exec(self, feedback_fn):
1330 """Computes the list of nodes and their attributes.
1333 nodenames = self.wanted
1334 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1336 # begin data gathering
1338 if self.dynamic_fields.intersection(self.op.output_fields):
1340 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1341 for name in nodenames:
1342 nodeinfo = node_data.get(name, None)
1345 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1346 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1347 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1348 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1349 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1350 "bootid": nodeinfo['bootid'],
1353 live_data[name] = {}
1355 live_data = dict.fromkeys(nodenames, {})
1357 node_to_primary = dict([(name, set()) for name in nodenames])
1358 node_to_secondary = dict([(name, set()) for name in nodenames])
1360 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1361 "sinst_cnt", "sinst_list"))
1362 if inst_fields & frozenset(self.op.output_fields):
1363 instancelist = self.cfg.GetInstanceList()
1365 for instance_name in instancelist:
1366 inst = self.cfg.GetInstanceInfo(instance_name)
1367 if inst.primary_node in node_to_primary:
1368 node_to_primary[inst.primary_node].add(inst.name)
1369 for secnode in inst.secondary_nodes:
1370 if secnode in node_to_secondary:
1371 node_to_secondary[secnode].add(inst.name)
1373 # end data gathering
1376 for node in nodelist:
1378 for field in self.op.output_fields:
1381 elif field == "pinst_list":
1382 val = list(node_to_primary[node.name])
1383 elif field == "sinst_list":
1384 val = list(node_to_secondary[node.name])
1385 elif field == "pinst_cnt":
1386 val = len(node_to_primary[node.name])
1387 elif field == "sinst_cnt":
1388 val = len(node_to_secondary[node.name])
1389 elif field == "pip":
1390 val = node.primary_ip
1391 elif field == "sip":
1392 val = node.secondary_ip
1393 elif field in self.dynamic_fields:
1394 val = live_data[node.name].get(field, None)
1396 raise errors.ParameterError(field)
1397 node_output.append(val)
1398 output.append(node_output)
1403 class LUQueryNodeVolumes(NoHooksLU):
1404 """Logical unit for getting volumes on node(s).
1407 _OP_REQP = ["nodes", "output_fields"]
1409 def CheckPrereq(self):
1410 """Check prerequisites.
1412 This checks that the fields required are valid output fields.
1415 self.nodes = _GetWantedNodes(self, self.op.nodes)
1417 _CheckOutputFields(static=["node"],
1418 dynamic=["phys", "vg", "name", "size", "instance"],
1419 selected=self.op.output_fields)
1422 def Exec(self, feedback_fn):
1423 """Computes the list of nodes and their attributes.
1426 nodenames = self.nodes
1427 volumes = rpc.call_node_volumes(nodenames)
1429 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1430 in self.cfg.GetInstanceList()]
1432 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1435 for node in nodenames:
1436 if node not in volumes or not volumes[node]:
1439 node_vols = volumes[node][:]
1440 node_vols.sort(key=lambda vol: vol['dev'])
1442 for vol in node_vols:
1444 for field in self.op.output_fields:
1447 elif field == "phys":
1451 elif field == "name":
1453 elif field == "size":
1454 val = int(float(vol['size']))
1455 elif field == "instance":
1457 if node not in lv_by_node[inst]:
1459 if vol['name'] in lv_by_node[inst][node]:
1465 raise errors.ParameterError(field)
1466 node_output.append(str(val))
1468 output.append(node_output)
1473 class LUAddNode(LogicalUnit):
1474 """Logical unit for adding node to the cluster.
1478 HTYPE = constants.HTYPE_NODE
1479 _OP_REQP = ["node_name"]
1481 def BuildHooksEnv(self):
1484 This will run on all nodes before, and on all nodes + the new node after.
1488 "OP_TARGET": self.op.node_name,
1489 "NODE_NAME": self.op.node_name,
1490 "NODE_PIP": self.op.primary_ip,
1491 "NODE_SIP": self.op.secondary_ip,
1493 nodes_0 = self.cfg.GetNodeList()
1494 nodes_1 = nodes_0 + [self.op.node_name, ]
1495 return env, nodes_0, nodes_1
1497 def CheckPrereq(self):
1498 """Check prerequisites.
1501 - the new node is not already in the config
1503 - its parameters (single/dual homed) matches the cluster
1505 Any errors are signalled by raising errors.OpPrereqError.
1508 node_name = self.op.node_name
1511 dns_data = utils.HostInfo(node_name)
1513 node = dns_data.name
1514 primary_ip = self.op.primary_ip = dns_data.ip
1515 secondary_ip = getattr(self.op, "secondary_ip", None)
1516 if secondary_ip is None:
1517 secondary_ip = primary_ip
1518 if not utils.IsValidIP(secondary_ip):
1519 raise errors.OpPrereqError("Invalid secondary IP given")
1520 self.op.secondary_ip = secondary_ip
1521 node_list = cfg.GetNodeList()
1522 if node in node_list:
1523 raise errors.OpPrereqError("Node %s is already in the configuration"
1526 for existing_node_name in node_list:
1527 existing_node = cfg.GetNodeInfo(existing_node_name)
1528 if (existing_node.primary_ip == primary_ip or
1529 existing_node.secondary_ip == primary_ip or
1530 existing_node.primary_ip == secondary_ip or
1531 existing_node.secondary_ip == secondary_ip):
1532 raise errors.OpPrereqError("New node ip address(es) conflict with"
1533 " existing node %s" % existing_node.name)
1535 # check that the type of the node (single versus dual homed) is the
1536 # same as for the master
1537 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1538 master_singlehomed = myself.secondary_ip == myself.primary_ip
1539 newbie_singlehomed = secondary_ip == primary_ip
1540 if master_singlehomed != newbie_singlehomed:
1541 if master_singlehomed:
1542 raise errors.OpPrereqError("The master has no private ip but the"
1543 " new node has one")
1545 raise errors.OpPrereqError("The master has a private ip but the"
1546 " new node doesn't have one")
1548 # checks reachablity
1549 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1550 raise errors.OpPrereqError("Node not reachable by ping")
1552 if not newbie_singlehomed:
1553 # check reachability from my secondary ip to newbie's secondary ip
1554 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1555 source=myself.secondary_ip):
1556 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1557 " based ping to noded port")
1559 self.new_node = objects.Node(name=node,
1560 primary_ip=primary_ip,
1561 secondary_ip=secondary_ip)
1563 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1564 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1565 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1566 constants.VNC_PASSWORD_FILE)
1568 def Exec(self, feedback_fn):
1569 """Adds the new node to the cluster.
1572 new_node = self.new_node
1573 node = new_node.name
1575 # set up inter-node password and certificate and restarts the node daemon
1576 gntpass = self.sstore.GetNodeDaemonPassword()
1577 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1578 raise errors.OpExecError("ganeti password corruption detected")
1579 f = open(constants.SSL_CERT_FILE)
1581 gntpem = f.read(8192)
1584 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1585 # so we use this to detect an invalid certificate; as long as the
1586 # cert doesn't contain this, the here-document will be correctly
1587 # parsed by the shell sequence below
1588 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1589 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1590 if not gntpem.endswith("\n"):
1591 raise errors.OpExecError("PEM must end with newline")
1592 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1594 # and then connect with ssh to set password and start ganeti-noded
1595 # note that all the below variables are sanitized at this point,
1596 # either by being constants or by the checks above
1598 mycommand = ("umask 077 && "
1599 "echo '%s' > '%s' && "
1600 "cat > '%s' << '!EOF.' && \n"
1601 "%s!EOF.\n%s restart" %
1602 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1603 constants.SSL_CERT_FILE, gntpem,
1604 constants.NODE_INITD_SCRIPT))
1606 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1608 raise errors.OpExecError("Remote command on node %s, error: %s,"
1610 (node, result.fail_reason, result.output))
1612 # check connectivity
1615 result = rpc.call_version([node])[node]
1617 if constants.PROTOCOL_VERSION == result:
1618 logger.Info("communication to node %s fine, sw version %s match" %
1621 raise errors.OpExecError("Version mismatch master version %s,"
1622 " node version %s" %
1623 (constants.PROTOCOL_VERSION, result))
1625 raise errors.OpExecError("Cannot get version from the new node")
1628 logger.Info("copy ssh key to node %s" % node)
1629 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1631 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1632 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1638 keyarray.append(f.read())
1642 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1643 keyarray[3], keyarray[4], keyarray[5])
1646 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1648 # Add node to our /etc/hosts, and add key to known_hosts
1649 _AddHostToEtcHosts(new_node.name)
1651 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1652 self.cfg.GetHostKey())
1654 if new_node.secondary_ip != new_node.primary_ip:
1655 if not rpc.call_node_tcp_ping(new_node.name,
1656 constants.LOCALHOST_IP_ADDRESS,
1657 new_node.secondary_ip,
1658 constants.DEFAULT_NODED_PORT,
1660 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1661 " you gave (%s). Please fix and re-run this"
1662 " command." % new_node.secondary_ip)
1664 success, msg = ssh.VerifyNodeHostname(node)
1666 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1667 " than the one the resolver gives: %s."
1668 " Please fix and re-run this command." %
1671 # Distribute updated /etc/hosts and known_hosts to all nodes,
1672 # including the node just added
1673 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1674 dist_nodes = self.cfg.GetNodeList() + [node]
1675 if myself.name in dist_nodes:
1676 dist_nodes.remove(myself.name)
1678 logger.Debug("Copying hosts and known_hosts to all nodes")
1679 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1680 result = rpc.call_upload_file(dist_nodes, fname)
1681 for to_node in dist_nodes:
1682 if not result[to_node]:
1683 logger.Error("copy of file %s to node %s failed" %
1686 to_copy = ss.GetFileList()
1687 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1688 to_copy.append(constants.VNC_PASSWORD_FILE)
1689 for fname in to_copy:
1690 if not ssh.CopyFileToNode(node, fname):
1691 logger.Error("could not copy file %s to node %s" % (fname, node))
1693 logger.Info("adding node %s to cluster.conf" % node)
1694 self.cfg.AddNode(new_node)
1697 class LUMasterFailover(LogicalUnit):
1698 """Failover the master node to the current node.
1700 This is a special LU in that it must run on a non-master node.
1703 HPATH = "master-failover"
1704 HTYPE = constants.HTYPE_CLUSTER
1708 def BuildHooksEnv(self):
1711 This will run on the new master only in the pre phase, and on all
1712 the nodes in the post phase.
1716 "OP_TARGET": self.new_master,
1717 "NEW_MASTER": self.new_master,
1718 "OLD_MASTER": self.old_master,
1720 return env, [self.new_master], self.cfg.GetNodeList()
1722 def CheckPrereq(self):
1723 """Check prerequisites.
1725 This checks that we are not already the master.
1728 self.new_master = utils.HostInfo().name
1729 self.old_master = self.sstore.GetMasterNode()
1731 if self.old_master == self.new_master:
1732 raise errors.OpPrereqError("This commands must be run on the node"
1733 " where you want the new master to be."
1734 " %s is already the master" %
1737 def Exec(self, feedback_fn):
1738 """Failover the master node.
1740 This command, when run on a non-master node, will cause the current
1741 master to cease being master, and the non-master to become new
1745 #TODO: do not rely on gethostname returning the FQDN
1746 logger.Info("setting master to %s, old master: %s" %
1747 (self.new_master, self.old_master))
1749 if not rpc.call_node_stop_master(self.old_master):
1750 logger.Error("could disable the master role on the old master"
1751 " %s, please disable manually" % self.old_master)
1754 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1755 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1756 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1757 logger.Error("could not distribute the new simple store master file"
1758 " to the other nodes, please check.")
1760 if not rpc.call_node_start_master(self.new_master):
1761 logger.Error("could not start the master role on the new master"
1762 " %s, please check" % self.new_master)
1763 feedback_fn("Error in activating the master IP on the new master,"
1764 " please fix manually.")
1768 class LUQueryClusterInfo(NoHooksLU):
1769 """Query cluster configuration.
1775 def CheckPrereq(self):
1776 """No prerequsites needed for this LU.
1781 def Exec(self, feedback_fn):
1782 """Return cluster config.
1786 "name": self.sstore.GetClusterName(),
1787 "software_version": constants.RELEASE_VERSION,
1788 "protocol_version": constants.PROTOCOL_VERSION,
1789 "config_version": constants.CONFIG_VERSION,
1790 "os_api_version": constants.OS_API_VERSION,
1791 "export_version": constants.EXPORT_VERSION,
1792 "master": self.sstore.GetMasterNode(),
1793 "architecture": (platform.architecture()[0], platform.machine()),
1799 class LUClusterCopyFile(NoHooksLU):
1800 """Copy file to cluster.
1803 _OP_REQP = ["nodes", "filename"]
1805 def CheckPrereq(self):
1806 """Check prerequisites.
1808 It should check that the named file exists and that the given list
1812 if not os.path.exists(self.op.filename):
1813 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1815 self.nodes = _GetWantedNodes(self, self.op.nodes)
1817 def Exec(self, feedback_fn):
1818 """Copy a file from master to some nodes.
1821 opts - class with options as members
1822 args - list containing a single element, the file name
1824 nodes - list containing the name of target nodes; if empty, all nodes
1827 filename = self.op.filename
1829 myname = utils.HostInfo().name
1831 for node in self.nodes:
1834 if not ssh.CopyFileToNode(node, filename):
1835 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1838 class LUDumpClusterConfig(NoHooksLU):
1839 """Return a text-representation of the cluster-config.
1844 def CheckPrereq(self):
1845 """No prerequisites.
1850 def Exec(self, feedback_fn):
1851 """Dump a representation of the cluster config to the standard output.
1854 return self.cfg.DumpConfig()
1857 class LURunClusterCommand(NoHooksLU):
1858 """Run a command on some nodes.
1861 _OP_REQP = ["command", "nodes"]
1863 def CheckPrereq(self):
1864 """Check prerequisites.
1866 It checks that the given list of nodes is valid.
1869 self.nodes = _GetWantedNodes(self, self.op.nodes)
1871 def Exec(self, feedback_fn):
1872 """Run a command on some nodes.
1875 # put the master at the end of the nodes list
1876 master_node = self.sstore.GetMasterNode()
1877 if master_node in self.nodes:
1878 self.nodes.remove(master_node)
1879 self.nodes.append(master_node)
1882 for node in self.nodes:
1883 result = ssh.SSHCall(node, "root", self.op.command)
1884 data.append((node, result.output, result.exit_code))
1889 class LUActivateInstanceDisks(NoHooksLU):
1890 """Bring up an instance's disks.
1893 _OP_REQP = ["instance_name"]
1895 def CheckPrereq(self):
1896 """Check prerequisites.
1898 This checks that the instance is in the cluster.
1901 instance = self.cfg.GetInstanceInfo(
1902 self.cfg.ExpandInstanceName(self.op.instance_name))
1903 if instance is None:
1904 raise errors.OpPrereqError("Instance '%s' not known" %
1905 self.op.instance_name)
1906 self.instance = instance
1909 def Exec(self, feedback_fn):
1910 """Activate the disks.
1913 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1915 raise errors.OpExecError("Cannot activate block devices")
1920 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1921 """Prepare the block devices for an instance.
1923 This sets up the block devices on all nodes.
1926 instance: a ganeti.objects.Instance object
1927 ignore_secondaries: if true, errors on secondary nodes won't result
1928 in an error return from the function
1931 false if the operation failed
1932 list of (host, instance_visible_name, node_visible_name) if the operation
1933 suceeded with the mapping from node devices to instance devices
1937 iname = instance.name
1938 # With the two passes mechanism we try to reduce the window of
1939 # opportunity for the race condition of switching DRBD to primary
1940 # before handshaking occured, but we do not eliminate it
1942 # The proper fix would be to wait (with some limits) until the
1943 # connection has been made and drbd transitions from WFConnection
1944 # into any other network-connected state (Connected, SyncTarget,
1947 # 1st pass, assemble on all nodes in secondary mode
1948 for inst_disk in instance.disks:
1949 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1950 cfg.SetDiskID(node_disk, node)
1951 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1953 logger.Error("could not prepare block device %s on node %s"
1954 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1955 if not ignore_secondaries:
1958 # FIXME: race condition on drbd migration to primary
1960 # 2nd pass, do only the primary node
1961 for inst_disk in instance.disks:
1962 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1963 if node != instance.primary_node:
1965 cfg.SetDiskID(node_disk, node)
1966 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1968 logger.Error("could not prepare block device %s on node %s"
1969 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1971 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1973 # leave the disks configured for the primary node
1974 # this is a workaround that would be fixed better by
1975 # improving the logical/physical id handling
1976 for disk in instance.disks:
1977 cfg.SetDiskID(disk, instance.primary_node)
1979 return disks_ok, device_info
1982 def _StartInstanceDisks(cfg, instance, force):
1983 """Start the disks of an instance.
1986 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1987 ignore_secondaries=force)
1989 _ShutdownInstanceDisks(instance, cfg)
1990 if force is not None and not force:
1991 logger.Error("If the message above refers to a secondary node,"
1992 " you can retry the operation using '--force'.")
1993 raise errors.OpExecError("Disk consistency error")
1996 class LUDeactivateInstanceDisks(NoHooksLU):
1997 """Shutdown an instance's disks.
2000 _OP_REQP = ["instance_name"]
2002 def CheckPrereq(self):
2003 """Check prerequisites.
2005 This checks that the instance is in the cluster.
2008 instance = self.cfg.GetInstanceInfo(
2009 self.cfg.ExpandInstanceName(self.op.instance_name))
2010 if instance is None:
2011 raise errors.OpPrereqError("Instance '%s' not known" %
2012 self.op.instance_name)
2013 self.instance = instance
2015 def Exec(self, feedback_fn):
2016 """Deactivate the disks
2019 instance = self.instance
2020 ins_l = rpc.call_instance_list([instance.primary_node])
2021 ins_l = ins_l[instance.primary_node]
2022 if not type(ins_l) is list:
2023 raise errors.OpExecError("Can't contact node '%s'" %
2024 instance.primary_node)
2026 if self.instance.name in ins_l:
2027 raise errors.OpExecError("Instance is running, can't shutdown"
2030 _ShutdownInstanceDisks(instance, self.cfg)
2033 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2034 """Shutdown block devices of an instance.
2036 This does the shutdown on all nodes of the instance.
2038 If the ignore_primary is false, errors on the primary node are
2043 for disk in instance.disks:
2044 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2045 cfg.SetDiskID(top_disk, node)
2046 if not rpc.call_blockdev_shutdown(node, top_disk):
2047 logger.Error("could not shutdown block device %s on node %s" %
2048 (disk.iv_name, node))
2049 if not ignore_primary or node != instance.primary_node:
2054 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2055 """Checks if a node has enough free memory.
2057 This function check if a given node has the needed amount of free
2058 memory. In case the node has less memory or we cannot get the
2059 information from the node, this function raise an OpPrereqError
2063 - cfg: a ConfigWriter instance
2064 - node: the node name
2065 - reason: string to use in the error message
2066 - requested: the amount of memory in MiB
2069 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2070 if not nodeinfo or not isinstance(nodeinfo, dict):
2071 raise errors.OpPrereqError("Could not contact node %s for resource"
2072 " information" % (node,))
2074 free_mem = nodeinfo[node].get('memory_free')
2075 if not isinstance(free_mem, int):
2076 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2077 " was '%s'" % (node, free_mem))
2078 if requested > free_mem:
2079 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2080 " needed %s MiB, available %s MiB" %
2081 (node, reason, requested, free_mem))
2084 class LUStartupInstance(LogicalUnit):
2085 """Starts an instance.
2088 HPATH = "instance-start"
2089 HTYPE = constants.HTYPE_INSTANCE
2090 _OP_REQP = ["instance_name", "force"]
2092 def BuildHooksEnv(self):
2095 This runs on master, primary and secondary nodes of the instance.
2099 "FORCE": self.op.force,
2101 env.update(_BuildInstanceHookEnvByObject(self.instance))
2102 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2103 list(self.instance.secondary_nodes))
2106 def CheckPrereq(self):
2107 """Check prerequisites.
2109 This checks that the instance is in the cluster.
2112 instance = self.cfg.GetInstanceInfo(
2113 self.cfg.ExpandInstanceName(self.op.instance_name))
2114 if instance is None:
2115 raise errors.OpPrereqError("Instance '%s' not known" %
2116 self.op.instance_name)
2118 # check bridges existance
2119 _CheckInstanceBridgesExist(instance)
2121 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2122 "starting instance %s" % instance.name,
2125 self.instance = instance
2126 self.op.instance_name = instance.name
2128 def Exec(self, feedback_fn):
2129 """Start the instance.
2132 instance = self.instance
2133 force = self.op.force
2134 extra_args = getattr(self.op, "extra_args", "")
2136 self.cfg.MarkInstanceUp(instance.name)
2138 node_current = instance.primary_node
2140 _StartInstanceDisks(self.cfg, instance, force)
2142 if not rpc.call_instance_start(node_current, instance, extra_args):
2143 _ShutdownInstanceDisks(instance, self.cfg)
2144 raise errors.OpExecError("Could not start instance")
2147 class LURebootInstance(LogicalUnit):
2148 """Reboot an instance.
2151 HPATH = "instance-reboot"
2152 HTYPE = constants.HTYPE_INSTANCE
2153 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2155 def BuildHooksEnv(self):
2158 This runs on master, primary and secondary nodes of the instance.
2162 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2164 env.update(_BuildInstanceHookEnvByObject(self.instance))
2165 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2166 list(self.instance.secondary_nodes))
2169 def CheckPrereq(self):
2170 """Check prerequisites.
2172 This checks that the instance is in the cluster.
2175 instance = self.cfg.GetInstanceInfo(
2176 self.cfg.ExpandInstanceName(self.op.instance_name))
2177 if instance is None:
2178 raise errors.OpPrereqError("Instance '%s' not known" %
2179 self.op.instance_name)
2181 # check bridges existance
2182 _CheckInstanceBridgesExist(instance)
2184 self.instance = instance
2185 self.op.instance_name = instance.name
2187 def Exec(self, feedback_fn):
2188 """Reboot the instance.
2191 instance = self.instance
2192 ignore_secondaries = self.op.ignore_secondaries
2193 reboot_type = self.op.reboot_type
2194 extra_args = getattr(self.op, "extra_args", "")
2196 node_current = instance.primary_node
2198 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2199 constants.INSTANCE_REBOOT_HARD,
2200 constants.INSTANCE_REBOOT_FULL]:
2201 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2202 (constants.INSTANCE_REBOOT_SOFT,
2203 constants.INSTANCE_REBOOT_HARD,
2204 constants.INSTANCE_REBOOT_FULL))
2206 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2207 constants.INSTANCE_REBOOT_HARD]:
2208 if not rpc.call_instance_reboot(node_current, instance,
2209 reboot_type, extra_args):
2210 raise errors.OpExecError("Could not reboot instance")
2212 if not rpc.call_instance_shutdown(node_current, instance):
2213 raise errors.OpExecError("could not shutdown instance for full reboot")
2214 _ShutdownInstanceDisks(instance, self.cfg)
2215 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2216 if not rpc.call_instance_start(node_current, instance, extra_args):
2217 _ShutdownInstanceDisks(instance, self.cfg)
2218 raise errors.OpExecError("Could not start instance for full reboot")
2220 self.cfg.MarkInstanceUp(instance.name)
2223 class LUShutdownInstance(LogicalUnit):
2224 """Shutdown an instance.
2227 HPATH = "instance-stop"
2228 HTYPE = constants.HTYPE_INSTANCE
2229 _OP_REQP = ["instance_name"]
2231 def BuildHooksEnv(self):
2234 This runs on master, primary and secondary nodes of the instance.
2237 env = _BuildInstanceHookEnvByObject(self.instance)
2238 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2239 list(self.instance.secondary_nodes))
2242 def CheckPrereq(self):
2243 """Check prerequisites.
2245 This checks that the instance is in the cluster.
2248 instance = self.cfg.GetInstanceInfo(
2249 self.cfg.ExpandInstanceName(self.op.instance_name))
2250 if instance is None:
2251 raise errors.OpPrereqError("Instance '%s' not known" %
2252 self.op.instance_name)
2253 self.instance = instance
2255 def Exec(self, feedback_fn):
2256 """Shutdown the instance.
2259 instance = self.instance
2260 node_current = instance.primary_node
2261 self.cfg.MarkInstanceDown(instance.name)
2262 if not rpc.call_instance_shutdown(node_current, instance):
2263 logger.Error("could not shutdown instance")
2265 _ShutdownInstanceDisks(instance, self.cfg)
2268 class LUReinstallInstance(LogicalUnit):
2269 """Reinstall an instance.
2272 HPATH = "instance-reinstall"
2273 HTYPE = constants.HTYPE_INSTANCE
2274 _OP_REQP = ["instance_name"]
2276 def BuildHooksEnv(self):
2279 This runs on master, primary and secondary nodes of the instance.
2282 env = _BuildInstanceHookEnvByObject(self.instance)
2283 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2284 list(self.instance.secondary_nodes))
2287 def CheckPrereq(self):
2288 """Check prerequisites.
2290 This checks that the instance is in the cluster and is not running.
2293 instance = self.cfg.GetInstanceInfo(
2294 self.cfg.ExpandInstanceName(self.op.instance_name))
2295 if instance is None:
2296 raise errors.OpPrereqError("Instance '%s' not known" %
2297 self.op.instance_name)
2298 if instance.disk_template == constants.DT_DISKLESS:
2299 raise errors.OpPrereqError("Instance '%s' has no disks" %
2300 self.op.instance_name)
2301 if instance.status != "down":
2302 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2303 self.op.instance_name)
2304 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2306 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2307 (self.op.instance_name,
2308 instance.primary_node))
2310 self.op.os_type = getattr(self.op, "os_type", None)
2311 if self.op.os_type is not None:
2313 pnode = self.cfg.GetNodeInfo(
2314 self.cfg.ExpandNodeName(instance.primary_node))
2316 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2318 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2320 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2321 " primary node" % self.op.os_type)
2323 self.instance = instance
2325 def Exec(self, feedback_fn):
2326 """Reinstall the instance.
2329 inst = self.instance
2331 if self.op.os_type is not None:
2332 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2333 inst.os = self.op.os_type
2334 self.cfg.AddInstance(inst)
2336 _StartInstanceDisks(self.cfg, inst, None)
2338 feedback_fn("Running the instance OS create scripts...")
2339 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2340 raise errors.OpExecError("Could not install OS for instance %s"
2342 (inst.name, inst.primary_node))
2344 _ShutdownInstanceDisks(inst, self.cfg)
2347 class LURenameInstance(LogicalUnit):
2348 """Rename an instance.
2351 HPATH = "instance-rename"
2352 HTYPE = constants.HTYPE_INSTANCE
2353 _OP_REQP = ["instance_name", "new_name"]
2355 def BuildHooksEnv(self):
2358 This runs on master, primary and secondary nodes of the instance.
2361 env = _BuildInstanceHookEnvByObject(self.instance)
2362 env["INSTANCE_NEW_NAME"] = self.op.new_name
2363 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2364 list(self.instance.secondary_nodes))
2367 def CheckPrereq(self):
2368 """Check prerequisites.
2370 This checks that the instance is in the cluster and is not running.
2373 instance = self.cfg.GetInstanceInfo(
2374 self.cfg.ExpandInstanceName(self.op.instance_name))
2375 if instance is None:
2376 raise errors.OpPrereqError("Instance '%s' not known" %
2377 self.op.instance_name)
2378 if instance.status != "down":
2379 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2380 self.op.instance_name)
2381 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2383 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2384 (self.op.instance_name,
2385 instance.primary_node))
2386 self.instance = instance
2388 # new name verification
2389 name_info = utils.HostInfo(self.op.new_name)
2391 self.op.new_name = new_name = name_info.name
2392 instance_list = self.cfg.GetInstanceList()
2393 if new_name in instance_list:
2394 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2397 if not getattr(self.op, "ignore_ip", False):
2398 command = ["fping", "-q", name_info.ip]
2399 result = utils.RunCmd(command)
2400 if not result.failed:
2401 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2402 (name_info.ip, new_name))
2405 def Exec(self, feedback_fn):
2406 """Reinstall the instance.
2409 inst = self.instance
2410 old_name = inst.name
2412 self.cfg.RenameInstance(inst.name, self.op.new_name)
2414 # re-read the instance from the configuration after rename
2415 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2417 _StartInstanceDisks(self.cfg, inst, None)
2419 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2421 msg = ("Could run OS rename script for instance %s on node %s (but the"
2422 " instance has been renamed in Ganeti)" %
2423 (inst.name, inst.primary_node))
2426 _ShutdownInstanceDisks(inst, self.cfg)
2429 class LURemoveInstance(LogicalUnit):
2430 """Remove an instance.
2433 HPATH = "instance-remove"
2434 HTYPE = constants.HTYPE_INSTANCE
2435 _OP_REQP = ["instance_name"]
2437 def BuildHooksEnv(self):
2440 This runs on master, primary and secondary nodes of the instance.
2443 env = _BuildInstanceHookEnvByObject(self.instance)
2444 nl = [self.sstore.GetMasterNode()]
2447 def CheckPrereq(self):
2448 """Check prerequisites.
2450 This checks that the instance is in the cluster.
2453 instance = self.cfg.GetInstanceInfo(
2454 self.cfg.ExpandInstanceName(self.op.instance_name))
2455 if instance is None:
2456 raise errors.OpPrereqError("Instance '%s' not known" %
2457 self.op.instance_name)
2458 self.instance = instance
2460 def Exec(self, feedback_fn):
2461 """Remove the instance.
2464 instance = self.instance
2465 logger.Info("shutting down instance %s on node %s" %
2466 (instance.name, instance.primary_node))
2468 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2469 if self.op.ignore_failures:
2470 feedback_fn("Warning: can't shutdown instance")
2472 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2473 (instance.name, instance.primary_node))
2475 logger.Info("removing block devices for instance %s" % instance.name)
2477 if not _RemoveDisks(instance, self.cfg):
2478 if self.op.ignore_failures:
2479 feedback_fn("Warning: can't remove instance's disks")
2481 raise errors.OpExecError("Can't remove instance's disks")
2483 logger.Info("removing instance %s out of cluster config" % instance.name)
2485 self.cfg.RemoveInstance(instance.name)
2488 class LUQueryInstances(NoHooksLU):
2489 """Logical unit for querying instances.
2492 _OP_REQP = ["output_fields", "names"]
2494 def CheckPrereq(self):
2495 """Check prerequisites.
2497 This checks that the fields required are valid output fields.
2500 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2501 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2502 "admin_state", "admin_ram",
2503 "disk_template", "ip", "mac", "bridge",
2504 "sda_size", "sdb_size", "vcpus"],
2505 dynamic=self.dynamic_fields,
2506 selected=self.op.output_fields)
2508 self.wanted = _GetWantedInstances(self, self.op.names)
2510 def Exec(self, feedback_fn):
2511 """Computes the list of nodes and their attributes.
2514 instance_names = self.wanted
2515 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2518 # begin data gathering
2520 nodes = frozenset([inst.primary_node for inst in instance_list])
2523 if self.dynamic_fields.intersection(self.op.output_fields):
2525 node_data = rpc.call_all_instances_info(nodes)
2527 result = node_data[name]
2529 live_data.update(result)
2530 elif result == False:
2531 bad_nodes.append(name)
2532 # else no instance is alive
2534 live_data = dict([(name, {}) for name in instance_names])
2536 # end data gathering
2539 for instance in instance_list:
2541 for field in self.op.output_fields:
2546 elif field == "pnode":
2547 val = instance.primary_node
2548 elif field == "snodes":
2549 val = list(instance.secondary_nodes)
2550 elif field == "admin_state":
2551 val = (instance.status != "down")
2552 elif field == "oper_state":
2553 if instance.primary_node in bad_nodes:
2556 val = bool(live_data.get(instance.name))
2557 elif field == "status":
2558 if instance.primary_node in bad_nodes:
2559 val = "ERROR_nodedown"
2561 running = bool(live_data.get(instance.name))
2563 if instance.status != "down":
2568 if instance.status != "down":
2572 elif field == "admin_ram":
2573 val = instance.memory
2574 elif field == "oper_ram":
2575 if instance.primary_node in bad_nodes:
2577 elif instance.name in live_data:
2578 val = live_data[instance.name].get("memory", "?")
2581 elif field == "disk_template":
2582 val = instance.disk_template
2584 val = instance.nics[0].ip
2585 elif field == "bridge":
2586 val = instance.nics[0].bridge
2587 elif field == "mac":
2588 val = instance.nics[0].mac
2589 elif field == "sda_size" or field == "sdb_size":
2590 disk = instance.FindDisk(field[:3])
2595 elif field == "vcpus":
2596 val = instance.vcpus
2598 raise errors.ParameterError(field)
2605 class LUFailoverInstance(LogicalUnit):
2606 """Failover an instance.
2609 HPATH = "instance-failover"
2610 HTYPE = constants.HTYPE_INSTANCE
2611 _OP_REQP = ["instance_name", "ignore_consistency"]
2613 def BuildHooksEnv(self):
2616 This runs on master, primary and secondary nodes of the instance.
2620 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2622 env.update(_BuildInstanceHookEnvByObject(self.instance))
2623 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2626 def CheckPrereq(self):
2627 """Check prerequisites.
2629 This checks that the instance is in the cluster.
2632 instance = self.cfg.GetInstanceInfo(
2633 self.cfg.ExpandInstanceName(self.op.instance_name))
2634 if instance is None:
2635 raise errors.OpPrereqError("Instance '%s' not known" %
2636 self.op.instance_name)
2638 if instance.disk_template not in constants.DTS_NET_MIRROR:
2639 raise errors.OpPrereqError("Instance's disk layout is not"
2640 " network mirrored, cannot failover.")
2642 secondary_nodes = instance.secondary_nodes
2643 if not secondary_nodes:
2644 raise errors.ProgrammerError("no secondary node but using "
2645 "DT_REMOTE_RAID1 template")
2647 target_node = secondary_nodes[0]
2648 # check memory requirements on the secondary node
2649 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2650 instance.name, instance.memory)
2652 # check bridge existance
2653 brlist = [nic.bridge for nic in instance.nics]
2654 if not rpc.call_bridges_exist(target_node, brlist):
2655 raise errors.OpPrereqError("One or more target bridges %s does not"
2656 " exist on destination node '%s'" %
2657 (brlist, target_node))
2659 self.instance = instance
2661 def Exec(self, feedback_fn):
2662 """Failover an instance.
2664 The failover is done by shutting it down on its present node and
2665 starting it on the secondary.
2668 instance = self.instance
2670 source_node = instance.primary_node
2671 target_node = instance.secondary_nodes[0]
2673 feedback_fn("* checking disk consistency between source and target")
2674 for dev in instance.disks:
2675 # for remote_raid1, these are md over drbd
2676 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2677 if instance.status == "up" and not self.op.ignore_consistency:
2678 raise errors.OpExecError("Disk %s is degraded on target node,"
2679 " aborting failover." % dev.iv_name)
2681 feedback_fn("* shutting down instance on source node")
2682 logger.Info("Shutting down instance %s on node %s" %
2683 (instance.name, source_node))
2685 if not rpc.call_instance_shutdown(source_node, instance):
2686 if self.op.ignore_consistency:
2687 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2688 " anyway. Please make sure node %s is down" %
2689 (instance.name, source_node, source_node))
2691 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2692 (instance.name, source_node))
2694 feedback_fn("* deactivating the instance's disks on source node")
2695 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2696 raise errors.OpExecError("Can't shut down the instance's disks.")
2698 instance.primary_node = target_node
2699 # distribute new instance config to the other nodes
2700 self.cfg.AddInstance(instance)
2702 # Only start the instance if it's marked as up
2703 if instance.status == "up":
2704 feedback_fn("* activating the instance's disks on target node")
2705 logger.Info("Starting instance %s on node %s" %
2706 (instance.name, target_node))
2708 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2709 ignore_secondaries=True)
2711 _ShutdownInstanceDisks(instance, self.cfg)
2712 raise errors.OpExecError("Can't activate the instance's disks")
2714 feedback_fn("* starting the instance on the target node")
2715 if not rpc.call_instance_start(target_node, instance, None):
2716 _ShutdownInstanceDisks(instance, self.cfg)
2717 raise errors.OpExecError("Could not start instance %s on node %s." %
2718 (instance.name, target_node))
2721 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2722 """Create a tree of block devices on the primary node.
2724 This always creates all devices.
2728 for child in device.children:
2729 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2732 cfg.SetDiskID(device, node)
2733 new_id = rpc.call_blockdev_create(node, device, device.size,
2734 instance.name, True, info)
2737 if device.physical_id is None:
2738 device.physical_id = new_id
2742 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2743 """Create a tree of block devices on a secondary node.
2745 If this device type has to be created on secondaries, create it and
2748 If not, just recurse to children keeping the same 'force' value.
2751 if device.CreateOnSecondary():
2754 for child in device.children:
2755 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2756 child, force, info):
2761 cfg.SetDiskID(device, node)
2762 new_id = rpc.call_blockdev_create(node, device, device.size,
2763 instance.name, False, info)
2766 if device.physical_id is None:
2767 device.physical_id = new_id
2771 def _GenerateUniqueNames(cfg, exts):
2772 """Generate a suitable LV name.
2774 This will generate a logical volume name for the given instance.
2779 new_id = cfg.GenerateUniqueID()
2780 results.append("%s%s" % (new_id, val))
2784 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2785 """Generate a drbd device complete with its children.
2788 port = cfg.AllocatePort()
2789 vgname = cfg.GetVGName()
2790 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2791 logical_id=(vgname, names[0]))
2792 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2793 logical_id=(vgname, names[1]))
2794 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2795 logical_id = (primary, secondary, port),
2796 children = [dev_data, dev_meta])
2800 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2801 """Generate a drbd8 device complete with its children.
2804 port = cfg.AllocatePort()
2805 vgname = cfg.GetVGName()
2806 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2807 logical_id=(vgname, names[0]))
2808 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2809 logical_id=(vgname, names[1]))
2810 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2811 logical_id = (primary, secondary, port),
2812 children = [dev_data, dev_meta],
2816 def _GenerateDiskTemplate(cfg, template_name,
2817 instance_name, primary_node,
2818 secondary_nodes, disk_sz, swap_sz):
2819 """Generate the entire disk layout for a given template type.
2822 #TODO: compute space requirements
2824 vgname = cfg.GetVGName()
2825 if template_name == constants.DT_DISKLESS:
2827 elif template_name == constants.DT_PLAIN:
2828 if len(secondary_nodes) != 0:
2829 raise errors.ProgrammerError("Wrong template configuration")
2831 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2832 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2833 logical_id=(vgname, names[0]),
2835 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2836 logical_id=(vgname, names[1]),
2838 disks = [sda_dev, sdb_dev]
2839 elif template_name == constants.DT_LOCAL_RAID1:
2840 if len(secondary_nodes) != 0:
2841 raise errors.ProgrammerError("Wrong template configuration")
2844 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2845 ".sdb_m1", ".sdb_m2"])
2846 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2847 logical_id=(vgname, names[0]))
2848 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2849 logical_id=(vgname, names[1]))
2850 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2852 children = [sda_dev_m1, sda_dev_m2])
2853 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2854 logical_id=(vgname, names[2]))
2855 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2856 logical_id=(vgname, names[3]))
2857 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2859 children = [sdb_dev_m1, sdb_dev_m2])
2860 disks = [md_sda_dev, md_sdb_dev]
2861 elif template_name == constants.DT_REMOTE_RAID1:
2862 if len(secondary_nodes) != 1:
2863 raise errors.ProgrammerError("Wrong template configuration")
2864 remote_node = secondary_nodes[0]
2865 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2866 ".sdb_data", ".sdb_meta"])
2867 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2868 disk_sz, names[0:2])
2869 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2870 children = [drbd_sda_dev], size=disk_sz)
2871 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2872 swap_sz, names[2:4])
2873 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2874 children = [drbd_sdb_dev], size=swap_sz)
2875 disks = [md_sda_dev, md_sdb_dev]
2876 elif template_name == constants.DT_DRBD8:
2877 if len(secondary_nodes) != 1:
2878 raise errors.ProgrammerError("Wrong template configuration")
2879 remote_node = secondary_nodes[0]
2880 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2881 ".sdb_data", ".sdb_meta"])
2882 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2883 disk_sz, names[0:2], "sda")
2884 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2885 swap_sz, names[2:4], "sdb")
2886 disks = [drbd_sda_dev, drbd_sdb_dev]
2888 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2892 def _GetInstanceInfoText(instance):
2893 """Compute that text that should be added to the disk's metadata.
2896 return "originstname+%s" % instance.name
2899 def _CreateDisks(cfg, instance):
2900 """Create all disks for an instance.
2902 This abstracts away some work from AddInstance.
2905 instance: the instance object
2908 True or False showing the success of the creation process
2911 info = _GetInstanceInfoText(instance)
2913 for device in instance.disks:
2914 logger.Info("creating volume %s for instance %s" %
2915 (device.iv_name, instance.name))
2917 for secondary_node in instance.secondary_nodes:
2918 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2919 device, False, info):
2920 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2921 (device.iv_name, device, secondary_node))
2924 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2925 instance, device, info):
2926 logger.Error("failed to create volume %s on primary!" %
2932 def _RemoveDisks(instance, cfg):
2933 """Remove all disks for an instance.
2935 This abstracts away some work from `AddInstance()` and
2936 `RemoveInstance()`. Note that in case some of the devices couldn't
2937 be removed, the removal will continue with the other ones (compare
2938 with `_CreateDisks()`).
2941 instance: the instance object
2944 True or False showing the success of the removal proces
2947 logger.Info("removing block devices for instance %s" % instance.name)
2950 for device in instance.disks:
2951 for node, disk in device.ComputeNodeTree(instance.primary_node):
2952 cfg.SetDiskID(disk, node)
2953 if not rpc.call_blockdev_remove(node, disk):
2954 logger.Error("could not remove block device %s on node %s,"
2955 " continuing anyway" %
2956 (device.iv_name, node))
2961 class LUCreateInstance(LogicalUnit):
2962 """Create an instance.
2965 HPATH = "instance-add"
2966 HTYPE = constants.HTYPE_INSTANCE
2967 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2968 "disk_template", "swap_size", "mode", "start", "vcpus",
2969 "wait_for_sync", "ip_check", "mac"]
2971 def BuildHooksEnv(self):
2974 This runs on master, primary and secondary nodes of the instance.
2978 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2979 "INSTANCE_DISK_SIZE": self.op.disk_size,
2980 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2981 "INSTANCE_ADD_MODE": self.op.mode,
2983 if self.op.mode == constants.INSTANCE_IMPORT:
2984 env["INSTANCE_SRC_NODE"] = self.op.src_node
2985 env["INSTANCE_SRC_PATH"] = self.op.src_path
2986 env["INSTANCE_SRC_IMAGE"] = self.src_image
2988 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2989 primary_node=self.op.pnode,
2990 secondary_nodes=self.secondaries,
2991 status=self.instance_status,
2992 os_type=self.op.os_type,
2993 memory=self.op.mem_size,
2994 vcpus=self.op.vcpus,
2995 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2998 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3003 def CheckPrereq(self):
3004 """Check prerequisites.
3007 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
3008 if not hasattr(self.op, attr):
3009 setattr(self.op, attr, None)
3011 if self.op.mode not in (constants.INSTANCE_CREATE,
3012 constants.INSTANCE_IMPORT):
3013 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3016 if self.op.mode == constants.INSTANCE_IMPORT:
3017 src_node = getattr(self.op, "src_node", None)
3018 src_path = getattr(self.op, "src_path", None)
3019 if src_node is None or src_path is None:
3020 raise errors.OpPrereqError("Importing an instance requires source"
3021 " node and path options")
3022 src_node_full = self.cfg.ExpandNodeName(src_node)
3023 if src_node_full is None:
3024 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3025 self.op.src_node = src_node = src_node_full
3027 if not os.path.isabs(src_path):
3028 raise errors.OpPrereqError("The source path must be absolute")
3030 export_info = rpc.call_export_info(src_node, src_path)
3033 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3035 if not export_info.has_section(constants.INISECT_EXP):
3036 raise errors.ProgrammerError("Corrupted export config")
3038 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3039 if (int(ei_version) != constants.EXPORT_VERSION):
3040 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3041 (ei_version, constants.EXPORT_VERSION))
3043 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3044 raise errors.OpPrereqError("Can't import instance with more than"
3047 # FIXME: are the old os-es, disk sizes, etc. useful?
3048 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3049 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3051 self.src_image = diskimage
3052 else: # INSTANCE_CREATE
3053 if getattr(self.op, "os_type", None) is None:
3054 raise errors.OpPrereqError("No guest OS specified")
3056 # check primary node
3057 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3059 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3061 self.op.pnode = pnode.name
3063 self.secondaries = []
3064 # disk template and mirror node verification
3065 if self.op.disk_template not in constants.DISK_TEMPLATES:
3066 raise errors.OpPrereqError("Invalid disk template name")
3068 if self.op.disk_template in constants.DTS_NET_MIRROR:
3069 if getattr(self.op, "snode", None) is None:
3070 raise errors.OpPrereqError("The networked disk templates need"
3073 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3074 if snode_name is None:
3075 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3077 elif snode_name == pnode.name:
3078 raise errors.OpPrereqError("The secondary node cannot be"
3079 " the primary node.")
3080 self.secondaries.append(snode_name)
3082 # Required free disk space as a function of disk and swap space
3084 constants.DT_DISKLESS: None,
3085 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3086 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3087 # 256 MB are added for drbd metadata, 128MB for each drbd device
3088 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3089 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3092 if self.op.disk_template not in req_size_dict:
3093 raise errors.ProgrammerError("Disk template '%s' size requirement"
3094 " is unknown" % self.op.disk_template)
3096 req_size = req_size_dict[self.op.disk_template]
3098 # Check lv size requirements
3099 if req_size is not None:
3100 nodenames = [pnode.name] + self.secondaries
3101 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3102 for node in nodenames:
3103 info = nodeinfo.get(node, None)
3105 raise errors.OpPrereqError("Cannot get current information"
3106 " from node '%s'" % nodeinfo)
3107 vg_free = info.get('vg_free', None)
3108 if not isinstance(vg_free, int):
3109 raise errors.OpPrereqError("Can't compute free disk space on"
3111 if req_size > info['vg_free']:
3112 raise errors.OpPrereqError("Not enough disk space on target node %s."
3113 " %d MB available, %d MB required" %
3114 (node, info['vg_free'], req_size))
3117 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3119 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3120 " primary node" % self.op.os_type)
3122 if self.op.kernel_path == constants.VALUE_NONE:
3123 raise errors.OpPrereqError("Can't set instance kernel to none")
3125 # instance verification
3126 hostname1 = utils.HostInfo(self.op.instance_name)
3128 self.op.instance_name = instance_name = hostname1.name
3129 instance_list = self.cfg.GetInstanceList()
3130 if instance_name in instance_list:
3131 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3134 ip = getattr(self.op, "ip", None)
3135 if ip is None or ip.lower() == "none":
3137 elif ip.lower() == "auto":
3138 inst_ip = hostname1.ip
3140 if not utils.IsValidIP(ip):
3141 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3142 " like a valid IP" % ip)
3144 self.inst_ip = inst_ip
3146 if self.op.start and not self.op.ip_check:
3147 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3148 " adding an instance in start mode")
3150 if self.op.ip_check:
3151 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3152 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3153 (hostname1.ip, instance_name))
3155 # MAC address verification
3156 if self.op.mac != "auto":
3157 if not utils.IsValidMac(self.op.mac.lower()):
3158 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3161 # bridge verification
3162 bridge = getattr(self.op, "bridge", None)
3164 self.op.bridge = self.cfg.GetDefBridge()
3166 self.op.bridge = bridge
3168 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3169 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3170 " destination node '%s'" %
3171 (self.op.bridge, pnode.name))
3173 # boot order verification
3174 if self.op.hvm_boot_order is not None:
3175 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3176 raise errors.OpPrereqError("invalid boot order specified,"
3177 " must be one or more of [acdn]")
3180 self.instance_status = 'up'
3182 self.instance_status = 'down'
3184 def Exec(self, feedback_fn):
3185 """Create and add the instance to the cluster.
3188 instance = self.op.instance_name
3189 pnode_name = self.pnode.name
3191 if self.op.mac == "auto":
3192 mac_address = self.cfg.GenerateMAC()
3194 mac_address = self.op.mac
3196 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3197 if self.inst_ip is not None:
3198 nic.ip = self.inst_ip
3200 ht_kind = self.sstore.GetHypervisorType()
3201 if ht_kind in constants.HTS_REQ_PORT:
3202 network_port = self.cfg.AllocatePort()
3206 disks = _GenerateDiskTemplate(self.cfg,
3207 self.op.disk_template,
3208 instance, pnode_name,
3209 self.secondaries, self.op.disk_size,
3212 iobj = objects.Instance(name=instance, os=self.op.os_type,
3213 primary_node=pnode_name,
3214 memory=self.op.mem_size,
3215 vcpus=self.op.vcpus,
3216 nics=[nic], disks=disks,
3217 disk_template=self.op.disk_template,
3218 status=self.instance_status,
3219 network_port=network_port,
3220 kernel_path=self.op.kernel_path,
3221 initrd_path=self.op.initrd_path,
3222 hvm_boot_order=self.op.hvm_boot_order,
3225 feedback_fn("* creating instance disks...")
3226 if not _CreateDisks(self.cfg, iobj):
3227 _RemoveDisks(iobj, self.cfg)
3228 raise errors.OpExecError("Device creation failed, reverting...")
3230 feedback_fn("adding instance %s to cluster config" % instance)
3232 self.cfg.AddInstance(iobj)
3234 if self.op.wait_for_sync:
3235 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3236 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3237 # make sure the disks are not degraded (still sync-ing is ok)
3239 feedback_fn("* checking mirrors status")
3240 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3245 _RemoveDisks(iobj, self.cfg)
3246 self.cfg.RemoveInstance(iobj.name)
3247 raise errors.OpExecError("There are some degraded disks for"
3250 feedback_fn("creating os for instance %s on node %s" %
3251 (instance, pnode_name))
3253 if iobj.disk_template != constants.DT_DISKLESS:
3254 if self.op.mode == constants.INSTANCE_CREATE:
3255 feedback_fn("* running the instance OS create scripts...")
3256 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3257 raise errors.OpExecError("could not add os for instance %s"
3259 (instance, pnode_name))
3261 elif self.op.mode == constants.INSTANCE_IMPORT:
3262 feedback_fn("* running the instance OS import scripts...")
3263 src_node = self.op.src_node
3264 src_image = self.src_image
3265 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3266 src_node, src_image):
3267 raise errors.OpExecError("Could not import os for instance"
3269 (instance, pnode_name))
3271 # also checked in the prereq part
3272 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3276 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3277 feedback_fn("* starting instance...")
3278 if not rpc.call_instance_start(pnode_name, iobj, None):
3279 raise errors.OpExecError("Could not start instance")
3282 class LUConnectConsole(NoHooksLU):
3283 """Connect to an instance's console.
3285 This is somewhat special in that it returns the command line that
3286 you need to run on the master node in order to connect to the
3290 _OP_REQP = ["instance_name"]
3292 def CheckPrereq(self):
3293 """Check prerequisites.
3295 This checks that the instance is in the cluster.
3298 instance = self.cfg.GetInstanceInfo(
3299 self.cfg.ExpandInstanceName(self.op.instance_name))
3300 if instance is None:
3301 raise errors.OpPrereqError("Instance '%s' not known" %
3302 self.op.instance_name)
3303 self.instance = instance
3305 def Exec(self, feedback_fn):
3306 """Connect to the console of an instance
3309 instance = self.instance
3310 node = instance.primary_node
3312 node_insts = rpc.call_instance_list([node])[node]
3313 if node_insts is False:
3314 raise errors.OpExecError("Can't connect to node %s." % node)
3316 if instance.name not in node_insts:
3317 raise errors.OpExecError("Instance %s is not running." % instance.name)
3319 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3321 hyper = hypervisor.GetHypervisor()
3322 console_cmd = hyper.GetShellCommandForConsole(instance)
3324 argv = ["ssh", "-q", "-t"]
3325 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3326 argv.extend(ssh.BATCH_MODE_OPTS)
3328 argv.append(console_cmd)
3332 class LUAddMDDRBDComponent(LogicalUnit):
3333 """Adda new mirror member to an instance's disk.
3336 HPATH = "mirror-add"
3337 HTYPE = constants.HTYPE_INSTANCE
3338 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3340 def BuildHooksEnv(self):
3343 This runs on the master, the primary and all the secondaries.
3347 "NEW_SECONDARY": self.op.remote_node,
3348 "DISK_NAME": self.op.disk_name,
3350 env.update(_BuildInstanceHookEnvByObject(self.instance))
3351 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3352 self.op.remote_node,] + list(self.instance.secondary_nodes)
3355 def CheckPrereq(self):
3356 """Check prerequisites.
3358 This checks that the instance is in the cluster.
3361 instance = self.cfg.GetInstanceInfo(
3362 self.cfg.ExpandInstanceName(self.op.instance_name))
3363 if instance is None:
3364 raise errors.OpPrereqError("Instance '%s' not known" %
3365 self.op.instance_name)
3366 self.instance = instance
3368 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3369 if remote_node is None:
3370 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3371 self.remote_node = remote_node
3373 if remote_node == instance.primary_node:
3374 raise errors.OpPrereqError("The specified node is the primary node of"
3377 if instance.disk_template != constants.DT_REMOTE_RAID1:
3378 raise errors.OpPrereqError("Instance's disk layout is not"
3380 for disk in instance.disks:
3381 if disk.iv_name == self.op.disk_name:
3384 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3385 " instance." % self.op.disk_name)
3386 if len(disk.children) > 1:
3387 raise errors.OpPrereqError("The device already has two slave devices."
3388 " This would create a 3-disk raid1 which we"
3392 def Exec(self, feedback_fn):
3393 """Add the mirror component
3397 instance = self.instance
3399 remote_node = self.remote_node
3400 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3401 names = _GenerateUniqueNames(self.cfg, lv_names)
3402 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3403 remote_node, disk.size, names)
3405 logger.Info("adding new mirror component on secondary")
3407 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3409 _GetInstanceInfoText(instance)):
3410 raise errors.OpExecError("Failed to create new component on secondary"
3411 " node %s" % remote_node)
3413 logger.Info("adding new mirror component on primary")
3415 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3417 _GetInstanceInfoText(instance)):
3418 # remove secondary dev
3419 self.cfg.SetDiskID(new_drbd, remote_node)
3420 rpc.call_blockdev_remove(remote_node, new_drbd)
3421 raise errors.OpExecError("Failed to create volume on primary")
3423 # the device exists now
3424 # call the primary node to add the mirror to md
3425 logger.Info("adding new mirror component to md")
3426 if not rpc.call_blockdev_addchildren(instance.primary_node,
3428 logger.Error("Can't add mirror compoment to md!")
3429 self.cfg.SetDiskID(new_drbd, remote_node)
3430 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3431 logger.Error("Can't rollback on secondary")
3432 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3433 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3434 logger.Error("Can't rollback on primary")
3435 raise errors.OpExecError("Can't add mirror component to md array")
3437 disk.children.append(new_drbd)
3439 self.cfg.AddInstance(instance)
3441 _WaitForSync(self.cfg, instance, self.proc)
3446 class LURemoveMDDRBDComponent(LogicalUnit):
3447 """Remove a component from a remote_raid1 disk.
3450 HPATH = "mirror-remove"
3451 HTYPE = constants.HTYPE_INSTANCE
3452 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3454 def BuildHooksEnv(self):
3457 This runs on the master, the primary and all the secondaries.
3461 "DISK_NAME": self.op.disk_name,
3462 "DISK_ID": self.op.disk_id,
3463 "OLD_SECONDARY": self.old_secondary,
3465 env.update(_BuildInstanceHookEnvByObject(self.instance))
3466 nl = [self.sstore.GetMasterNode(),
3467 self.instance.primary_node] + list(self.instance.secondary_nodes)
3470 def CheckPrereq(self):
3471 """Check prerequisites.
3473 This checks that the instance is in the cluster.
3476 instance = self.cfg.GetInstanceInfo(
3477 self.cfg.ExpandInstanceName(self.op.instance_name))
3478 if instance is None:
3479 raise errors.OpPrereqError("Instance '%s' not known" %
3480 self.op.instance_name)
3481 self.instance = instance
3483 if instance.disk_template != constants.DT_REMOTE_RAID1:
3484 raise errors.OpPrereqError("Instance's disk layout is not"
3486 for disk in instance.disks:
3487 if disk.iv_name == self.op.disk_name:
3490 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3491 " instance." % self.op.disk_name)
3492 for child in disk.children:
3493 if (child.dev_type == constants.LD_DRBD7 and
3494 child.logical_id[2] == self.op.disk_id):
3497 raise errors.OpPrereqError("Can't find the device with this port.")
3499 if len(disk.children) < 2:
3500 raise errors.OpPrereqError("Cannot remove the last component from"
3504 if self.child.logical_id[0] == instance.primary_node:
3508 self.old_secondary = self.child.logical_id[oid]
3510 def Exec(self, feedback_fn):
3511 """Remove the mirror component
3514 instance = self.instance
3517 logger.Info("remove mirror component")
3518 self.cfg.SetDiskID(disk, instance.primary_node)
3519 if not rpc.call_blockdev_removechildren(instance.primary_node,
3521 raise errors.OpExecError("Can't remove child from mirror.")
3523 for node in child.logical_id[:2]:
3524 self.cfg.SetDiskID(child, node)
3525 if not rpc.call_blockdev_remove(node, child):
3526 logger.Error("Warning: failed to remove device from node %s,"
3527 " continuing operation." % node)
3529 disk.children.remove(child)
3530 self.cfg.AddInstance(instance)
3533 class LUReplaceDisks(LogicalUnit):
3534 """Replace the disks of an instance.
3537 HPATH = "mirrors-replace"
3538 HTYPE = constants.HTYPE_INSTANCE
3539 _OP_REQP = ["instance_name", "mode", "disks"]
3541 def BuildHooksEnv(self):
3544 This runs on the master, the primary and all the secondaries.
3548 "MODE": self.op.mode,
3549 "NEW_SECONDARY": self.op.remote_node,
3550 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3552 env.update(_BuildInstanceHookEnvByObject(self.instance))
3554 self.sstore.GetMasterNode(),
3555 self.instance.primary_node,
3557 if self.op.remote_node is not None:
3558 nl.append(self.op.remote_node)
3561 def CheckPrereq(self):
3562 """Check prerequisites.
3564 This checks that the instance is in the cluster.
3567 instance = self.cfg.GetInstanceInfo(
3568 self.cfg.ExpandInstanceName(self.op.instance_name))
3569 if instance is None:
3570 raise errors.OpPrereqError("Instance '%s' not known" %
3571 self.op.instance_name)
3572 self.instance = instance
3573 self.op.instance_name = instance.name
3575 if instance.disk_template not in constants.DTS_NET_MIRROR:
3576 raise errors.OpPrereqError("Instance's disk layout is not"
3577 " network mirrored.")
3579 if len(instance.secondary_nodes) != 1:
3580 raise errors.OpPrereqError("The instance has a strange layout,"
3581 " expected one secondary but found %d" %
3582 len(instance.secondary_nodes))
3584 self.sec_node = instance.secondary_nodes[0]
3586 remote_node = getattr(self.op, "remote_node", None)
3587 if remote_node is not None:
3588 remote_node = self.cfg.ExpandNodeName(remote_node)
3589 if remote_node is None:
3590 raise errors.OpPrereqError("Node '%s' not known" %
3591 self.op.remote_node)
3592 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3594 self.remote_node_info = None
3595 if remote_node == instance.primary_node:
3596 raise errors.OpPrereqError("The specified node is the primary node of"
3598 elif remote_node == self.sec_node:
3599 if self.op.mode == constants.REPLACE_DISK_SEC:
3600 # this is for DRBD8, where we can't execute the same mode of
3601 # replacement as for drbd7 (no different port allocated)
3602 raise errors.OpPrereqError("Same secondary given, cannot execute"
3604 # the user gave the current secondary, switch to
3605 # 'no-replace-secondary' mode for drbd7
3607 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3608 self.op.mode != constants.REPLACE_DISK_ALL):
3609 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3610 " disks replacement, not individual ones")
3611 if instance.disk_template == constants.DT_DRBD8:
3612 if (self.op.mode == constants.REPLACE_DISK_ALL and
3613 remote_node is not None):
3614 # switch to replace secondary mode
3615 self.op.mode = constants.REPLACE_DISK_SEC
3617 if self.op.mode == constants.REPLACE_DISK_ALL:
3618 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3619 " secondary disk replacement, not"
3621 elif self.op.mode == constants.REPLACE_DISK_PRI:
3622 if remote_node is not None:
3623 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3624 " the secondary while doing a primary"
3625 " node disk replacement")
3626 self.tgt_node = instance.primary_node
3627 self.oth_node = instance.secondary_nodes[0]
3628 elif self.op.mode == constants.REPLACE_DISK_SEC:
3629 self.new_node = remote_node # this can be None, in which case
3630 # we don't change the secondary
3631 self.tgt_node = instance.secondary_nodes[0]
3632 self.oth_node = instance.primary_node
3634 raise errors.ProgrammerError("Unhandled disk replace mode")
3636 for name in self.op.disks:
3637 if instance.FindDisk(name) is None:
3638 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3639 (name, instance.name))
3640 self.op.remote_node = remote_node
3642 def _ExecRR1(self, feedback_fn):
3643 """Replace the disks of an instance.
3646 instance = self.instance
3649 if self.op.remote_node is None:
3650 remote_node = self.sec_node
3652 remote_node = self.op.remote_node
3654 for dev in instance.disks:
3656 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3657 names = _GenerateUniqueNames(cfg, lv_names)
3658 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3659 remote_node, size, names)
3660 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3661 logger.Info("adding new mirror component on secondary for %s" %
3664 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3666 _GetInstanceInfoText(instance)):
3667 raise errors.OpExecError("Failed to create new component on secondary"
3668 " node %s. Full abort, cleanup manually!" %
3671 logger.Info("adding new mirror component on primary")
3673 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3675 _GetInstanceInfoText(instance)):
3676 # remove secondary dev
3677 cfg.SetDiskID(new_drbd, remote_node)
3678 rpc.call_blockdev_remove(remote_node, new_drbd)
3679 raise errors.OpExecError("Failed to create volume on primary!"
3680 " Full abort, cleanup manually!!")
3682 # the device exists now
3683 # call the primary node to add the mirror to md
3684 logger.Info("adding new mirror component to md")
3685 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3687 logger.Error("Can't add mirror compoment to md!")
3688 cfg.SetDiskID(new_drbd, remote_node)
3689 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3690 logger.Error("Can't rollback on secondary")
3691 cfg.SetDiskID(new_drbd, instance.primary_node)
3692 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3693 logger.Error("Can't rollback on primary")
3694 raise errors.OpExecError("Full abort, cleanup manually!!")
3696 dev.children.append(new_drbd)
3697 cfg.AddInstance(instance)
3699 # this can fail as the old devices are degraded and _WaitForSync
3700 # does a combined result over all disks, so we don't check its
3702 _WaitForSync(cfg, instance, self.proc, unlock=True)
3704 # so check manually all the devices
3705 for name in iv_names:
3706 dev, child, new_drbd = iv_names[name]
3707 cfg.SetDiskID(dev, instance.primary_node)
3708 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3710 raise errors.OpExecError("MD device %s is degraded!" % name)
3711 cfg.SetDiskID(new_drbd, instance.primary_node)
3712 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3714 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3716 for name in iv_names:
3717 dev, child, new_drbd = iv_names[name]
3718 logger.Info("remove mirror %s component" % name)
3719 cfg.SetDiskID(dev, instance.primary_node)
3720 if not rpc.call_blockdev_removechildren(instance.primary_node,
3722 logger.Error("Can't remove child from mirror, aborting"
3723 " *this device cleanup*.\nYou need to cleanup manually!!")
3726 for node in child.logical_id[:2]:
3727 logger.Info("remove child device on %s" % node)
3728 cfg.SetDiskID(child, node)
3729 if not rpc.call_blockdev_remove(node, child):
3730 logger.Error("Warning: failed to remove device from node %s,"
3731 " continuing operation." % node)
3733 dev.children.remove(child)
3735 cfg.AddInstance(instance)
3737 def _ExecD8DiskOnly(self, feedback_fn):
3738 """Replace a disk on the primary or secondary for dbrd8.
3740 The algorithm for replace is quite complicated:
3741 - for each disk to be replaced:
3742 - create new LVs on the target node with unique names
3743 - detach old LVs from the drbd device
3744 - rename old LVs to name_replaced.<time_t>
3745 - rename new LVs to old LVs
3746 - attach the new LVs (with the old names now) to the drbd device
3747 - wait for sync across all devices
3748 - for each modified disk:
3749 - remove old LVs (which have the name name_replaces.<time_t>)
3751 Failures are not very well handled.
3755 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3756 instance = self.instance
3758 vgname = self.cfg.GetVGName()
3761 tgt_node = self.tgt_node
3762 oth_node = self.oth_node
3764 # Step: check device activation
3765 self.proc.LogStep(1, steps_total, "check device existence")
3766 info("checking volume groups")
3767 my_vg = cfg.GetVGName()
3768 results = rpc.call_vg_list([oth_node, tgt_node])
3770 raise errors.OpExecError("Can't list volume groups on the nodes")
3771 for node in oth_node, tgt_node:
3772 res = results.get(node, False)
3773 if not res or my_vg not in res:
3774 raise errors.OpExecError("Volume group '%s' not found on %s" %
3776 for dev in instance.disks:
3777 if not dev.iv_name in self.op.disks:
3779 for node in tgt_node, oth_node:
3780 info("checking %s on %s" % (dev.iv_name, node))
3781 cfg.SetDiskID(dev, node)
3782 if not rpc.call_blockdev_find(node, dev):
3783 raise errors.OpExecError("Can't find device %s on node %s" %
3784 (dev.iv_name, node))
3786 # Step: check other node consistency
3787 self.proc.LogStep(2, steps_total, "check peer consistency")
3788 for dev in instance.disks:
3789 if not dev.iv_name in self.op.disks:
3791 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3792 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3793 oth_node==instance.primary_node):
3794 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3795 " to replace disks on this node (%s)" %
3796 (oth_node, tgt_node))
3798 # Step: create new storage
3799 self.proc.LogStep(3, steps_total, "allocate new storage")
3800 for dev in instance.disks:
3801 if not dev.iv_name in self.op.disks:
3804 cfg.SetDiskID(dev, tgt_node)
3805 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3806 names = _GenerateUniqueNames(cfg, lv_names)
3807 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3808 logical_id=(vgname, names[0]))
3809 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3810 logical_id=(vgname, names[1]))
3811 new_lvs = [lv_data, lv_meta]
3812 old_lvs = dev.children
3813 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3814 info("creating new local storage on %s for %s" %
3815 (tgt_node, dev.iv_name))
3816 # since we *always* want to create this LV, we use the
3817 # _Create...OnPrimary (which forces the creation), even if we
3818 # are talking about the secondary node
3819 for new_lv in new_lvs:
3820 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3821 _GetInstanceInfoText(instance)):
3822 raise errors.OpExecError("Failed to create new LV named '%s' on"
3824 (new_lv.logical_id[1], tgt_node))
3826 # Step: for each lv, detach+rename*2+attach
3827 self.proc.LogStep(4, steps_total, "change drbd configuration")
3828 for dev, old_lvs, new_lvs in iv_names.itervalues():
3829 info("detaching %s drbd from local storage" % dev.iv_name)
3830 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3831 raise errors.OpExecError("Can't detach drbd from local storage on node"
3832 " %s for device %s" % (tgt_node, dev.iv_name))
3834 #cfg.Update(instance)
3836 # ok, we created the new LVs, so now we know we have the needed
3837 # storage; as such, we proceed on the target node to rename
3838 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3839 # using the assumption that logical_id == physical_id (which in
3840 # turn is the unique_id on that node)
3842 # FIXME(iustin): use a better name for the replaced LVs
3843 temp_suffix = int(time.time())
3844 ren_fn = lambda d, suff: (d.physical_id[0],
3845 d.physical_id[1] + "_replaced-%s" % suff)
3846 # build the rename list based on what LVs exist on the node
3848 for to_ren in old_lvs:
3849 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3850 if find_res is not None: # device exists
3851 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3853 info("renaming the old LVs on the target node")
3854 if not rpc.call_blockdev_rename(tgt_node, rlist):
3855 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3856 # now we rename the new LVs to the old LVs
3857 info("renaming the new LVs on the target node")
3858 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3859 if not rpc.call_blockdev_rename(tgt_node, rlist):
3860 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3862 for old, new in zip(old_lvs, new_lvs):
3863 new.logical_id = old.logical_id
3864 cfg.SetDiskID(new, tgt_node)
3866 for disk in old_lvs:
3867 disk.logical_id = ren_fn(disk, temp_suffix)
3868 cfg.SetDiskID(disk, tgt_node)
3870 # now that the new lvs have the old name, we can add them to the device
3871 info("adding new mirror component on %s" % tgt_node)
3872 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3873 for new_lv in new_lvs:
3874 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3875 warning("Can't rollback device %s", hint="manually cleanup unused"
3877 raise errors.OpExecError("Can't add local storage to drbd")
3879 dev.children = new_lvs
3880 cfg.Update(instance)
3882 # Step: wait for sync
3884 # this can fail as the old devices are degraded and _WaitForSync
3885 # does a combined result over all disks, so we don't check its
3887 self.proc.LogStep(5, steps_total, "sync devices")
3888 _WaitForSync(cfg, instance, self.proc, unlock=True)
3890 # so check manually all the devices
3891 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3892 cfg.SetDiskID(dev, instance.primary_node)
3893 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3895 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3897 # Step: remove old storage
3898 self.proc.LogStep(6, steps_total, "removing old storage")
3899 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3900 info("remove logical volumes for %s" % name)
3902 cfg.SetDiskID(lv, tgt_node)
3903 if not rpc.call_blockdev_remove(tgt_node, lv):
3904 warning("Can't remove old LV", hint="manually remove unused LVs")
3907 def _ExecD8Secondary(self, feedback_fn):
3908 """Replace the secondary node for drbd8.
3910 The algorithm for replace is quite complicated:
3911 - for all disks of the instance:
3912 - create new LVs on the new node with same names
3913 - shutdown the drbd device on the old secondary
3914 - disconnect the drbd network on the primary
3915 - create the drbd device on the new secondary
3916 - network attach the drbd on the primary, using an artifice:
3917 the drbd code for Attach() will connect to the network if it
3918 finds a device which is connected to the good local disks but
3920 - wait for sync across all devices
3921 - remove all disks from the old secondary
3923 Failures are not very well handled.
3927 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3928 instance = self.instance
3930 vgname = self.cfg.GetVGName()
3933 old_node = self.tgt_node
3934 new_node = self.new_node
3935 pri_node = instance.primary_node
3937 # Step: check device activation
3938 self.proc.LogStep(1, steps_total, "check device existence")
3939 info("checking volume groups")
3940 my_vg = cfg.GetVGName()
3941 results = rpc.call_vg_list([pri_node, new_node])
3943 raise errors.OpExecError("Can't list volume groups on the nodes")
3944 for node in pri_node, new_node:
3945 res = results.get(node, False)
3946 if not res or my_vg not in res:
3947 raise errors.OpExecError("Volume group '%s' not found on %s" %
3949 for dev in instance.disks:
3950 if not dev.iv_name in self.op.disks:
3952 info("checking %s on %s" % (dev.iv_name, pri_node))
3953 cfg.SetDiskID(dev, pri_node)
3954 if not rpc.call_blockdev_find(pri_node, dev):
3955 raise errors.OpExecError("Can't find device %s on node %s" %
3956 (dev.iv_name, pri_node))
3958 # Step: check other node consistency
3959 self.proc.LogStep(2, steps_total, "check peer consistency")
3960 for dev in instance.disks:
3961 if not dev.iv_name in self.op.disks:
3963 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3964 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3965 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3966 " unsafe to replace the secondary" %
3969 # Step: create new storage
3970 self.proc.LogStep(3, steps_total, "allocate new storage")
3971 for dev in instance.disks:
3973 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3974 # since we *always* want to create this LV, we use the
3975 # _Create...OnPrimary (which forces the creation), even if we
3976 # are talking about the secondary node
3977 for new_lv in dev.children:
3978 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3979 _GetInstanceInfoText(instance)):
3980 raise errors.OpExecError("Failed to create new LV named '%s' on"
3982 (new_lv.logical_id[1], new_node))
3984 iv_names[dev.iv_name] = (dev, dev.children)
3986 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3987 for dev in instance.disks:
3989 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3990 # create new devices on new_node
3991 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3992 logical_id=(pri_node, new_node,
3994 children=dev.children)
3995 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3997 _GetInstanceInfoText(instance)):
3998 raise errors.OpExecError("Failed to create new DRBD on"
3999 " node '%s'" % new_node)
4001 for dev in instance.disks:
4002 # we have new devices, shutdown the drbd on the old secondary
4003 info("shutting down drbd for %s on old node" % dev.iv_name)
4004 cfg.SetDiskID(dev, old_node)
4005 if not rpc.call_blockdev_shutdown(old_node, dev):
4006 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4007 hint="Please cleanup this device manually as soon as possible")
4009 info("detaching primary drbds from the network (=> standalone)")
4011 for dev in instance.disks:
4012 cfg.SetDiskID(dev, pri_node)
4013 # set the physical (unique in bdev terms) id to None, meaning
4014 # detach from network
4015 dev.physical_id = (None,) * len(dev.physical_id)
4016 # and 'find' the device, which will 'fix' it to match the
4018 if rpc.call_blockdev_find(pri_node, dev):
4021 warning("Failed to detach drbd %s from network, unusual case" %
4025 # no detaches succeeded (very unlikely)
4026 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4028 # if we managed to detach at least one, we update all the disks of
4029 # the instance to point to the new secondary
4030 info("updating instance configuration")
4031 for dev in instance.disks:
4032 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4033 cfg.SetDiskID(dev, pri_node)
4034 cfg.Update(instance)
4036 # and now perform the drbd attach
4037 info("attaching primary drbds to new secondary (standalone => connected)")
4039 for dev in instance.disks:
4040 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4041 # since the attach is smart, it's enough to 'find' the device,
4042 # it will automatically activate the network, if the physical_id
4044 cfg.SetDiskID(dev, pri_node)
4045 if not rpc.call_blockdev_find(pri_node, dev):
4046 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4047 "please do a gnt-instance info to see the status of disks")
4049 # this can fail as the old devices are degraded and _WaitForSync
4050 # does a combined result over all disks, so we don't check its
4052 self.proc.LogStep(5, steps_total, "sync devices")
4053 _WaitForSync(cfg, instance, self.proc, unlock=True)
4055 # so check manually all the devices
4056 for name, (dev, old_lvs) in iv_names.iteritems():
4057 cfg.SetDiskID(dev, pri_node)
4058 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4060 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4062 self.proc.LogStep(6, steps_total, "removing old storage")
4063 for name, (dev, old_lvs) in iv_names.iteritems():
4064 info("remove logical volumes for %s" % name)
4066 cfg.SetDiskID(lv, old_node)
4067 if not rpc.call_blockdev_remove(old_node, lv):
4068 warning("Can't remove LV on old secondary",
4069 hint="Cleanup stale volumes by hand")
4071 def Exec(self, feedback_fn):
4072 """Execute disk replacement.
4074 This dispatches the disk replacement to the appropriate handler.
4077 instance = self.instance
4078 if instance.disk_template == constants.DT_REMOTE_RAID1:
4080 elif instance.disk_template == constants.DT_DRBD8:
4081 if self.op.remote_node is None:
4082 fn = self._ExecD8DiskOnly
4084 fn = self._ExecD8Secondary
4086 raise errors.ProgrammerError("Unhandled disk replacement case")
4087 return fn(feedback_fn)
4090 class LUQueryInstanceData(NoHooksLU):
4091 """Query runtime instance data.
4094 _OP_REQP = ["instances"]
4096 def CheckPrereq(self):
4097 """Check prerequisites.
4099 This only checks the optional instance list against the existing names.
4102 if not isinstance(self.op.instances, list):
4103 raise errors.OpPrereqError("Invalid argument type 'instances'")
4104 if self.op.instances:
4105 self.wanted_instances = []
4106 names = self.op.instances
4108 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4109 if instance is None:
4110 raise errors.OpPrereqError("No such instance name '%s'" % name)
4111 self.wanted_instances.append(instance)
4113 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4114 in self.cfg.GetInstanceList()]
4118 def _ComputeDiskStatus(self, instance, snode, dev):
4119 """Compute block device status.
4122 self.cfg.SetDiskID(dev, instance.primary_node)
4123 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4124 if dev.dev_type in constants.LDS_DRBD:
4125 # we change the snode then (otherwise we use the one passed in)
4126 if dev.logical_id[0] == instance.primary_node:
4127 snode = dev.logical_id[1]
4129 snode = dev.logical_id[0]
4132 self.cfg.SetDiskID(dev, snode)
4133 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4138 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4139 for child in dev.children]
4144 "iv_name": dev.iv_name,
4145 "dev_type": dev.dev_type,
4146 "logical_id": dev.logical_id,
4147 "physical_id": dev.physical_id,
4148 "pstatus": dev_pstatus,
4149 "sstatus": dev_sstatus,
4150 "children": dev_children,
4155 def Exec(self, feedback_fn):
4156 """Gather and return data"""
4158 for instance in self.wanted_instances:
4159 remote_info = rpc.call_instance_info(instance.primary_node,
4161 if remote_info and "state" in remote_info:
4164 remote_state = "down"
4165 if instance.status == "down":
4166 config_state = "down"
4170 disks = [self._ComputeDiskStatus(instance, None, device)
4171 for device in instance.disks]
4174 "name": instance.name,
4175 "config_state": config_state,
4176 "run_state": remote_state,
4177 "pnode": instance.primary_node,
4178 "snodes": instance.secondary_nodes,
4180 "memory": instance.memory,
4181 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4183 "network_port": instance.network_port,
4184 "vcpus": instance.vcpus,
4185 "kernel_path": instance.kernel_path,
4186 "initrd_path": instance.initrd_path,
4187 "hvm_boot_order": instance.hvm_boot_order,
4190 result[instance.name] = idict
4195 class LUSetInstanceParms(LogicalUnit):
4196 """Modifies an instances's parameters.
4199 HPATH = "instance-modify"
4200 HTYPE = constants.HTYPE_INSTANCE
4201 _OP_REQP = ["instance_name"]
4203 def BuildHooksEnv(self):
4206 This runs on the master, primary and secondaries.
4211 args['memory'] = self.mem
4213 args['vcpus'] = self.vcpus
4214 if self.do_ip or self.do_bridge or self.mac:
4218 ip = self.instance.nics[0].ip
4220 bridge = self.bridge
4222 bridge = self.instance.nics[0].bridge
4226 mac = self.instance.nics[0].mac
4227 args['nics'] = [(ip, bridge, mac)]
4228 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4229 nl = [self.sstore.GetMasterNode(),
4230 self.instance.primary_node] + list(self.instance.secondary_nodes)
4233 def CheckPrereq(self):
4234 """Check prerequisites.
4236 This only checks the instance list against the existing names.
4239 self.mem = getattr(self.op, "mem", None)
4240 self.vcpus = getattr(self.op, "vcpus", None)
4241 self.ip = getattr(self.op, "ip", None)
4242 self.mac = getattr(self.op, "mac", None)
4243 self.bridge = getattr(self.op, "bridge", None)
4244 self.kernel_path = getattr(self.op, "kernel_path", None)
4245 self.initrd_path = getattr(self.op, "initrd_path", None)
4246 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4247 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4248 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4249 if all_parms.count(None) == len(all_parms):
4250 raise errors.OpPrereqError("No changes submitted")
4251 if self.mem is not None:
4253 self.mem = int(self.mem)
4254 except ValueError, err:
4255 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4256 if self.vcpus is not None:
4258 self.vcpus = int(self.vcpus)
4259 except ValueError, err:
4260 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4261 if self.ip is not None:
4263 if self.ip.lower() == "none":
4266 if not utils.IsValidIP(self.ip):
4267 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4270 self.do_bridge = (self.bridge is not None)
4271 if self.mac is not None:
4272 if self.cfg.IsMacInUse(self.mac):
4273 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4275 if not utils.IsValidMac(self.mac):
4276 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4278 if self.kernel_path is not None:
4279 self.do_kernel_path = True
4280 if self.kernel_path == constants.VALUE_NONE:
4281 raise errors.OpPrereqError("Can't set instance to no kernel")
4283 if self.kernel_path != constants.VALUE_DEFAULT:
4284 if not os.path.isabs(self.kernel_path):
4285 raise errors.OpPrereqError("The kernel path must be an absolute"
4288 self.do_kernel_path = False
4290 if self.initrd_path is not None:
4291 self.do_initrd_path = True
4292 if self.initrd_path not in (constants.VALUE_NONE,
4293 constants.VALUE_DEFAULT):
4294 if not os.path.isabs(self.initrd_path):
4295 raise errors.OpPrereqError("The initrd path must be an absolute"
4298 self.do_initrd_path = False
4300 # boot order verification
4301 if self.hvm_boot_order is not None:
4302 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4303 if len(self.hvm_boot_order.strip("acdn")) != 0:
4304 raise errors.OpPrereqError("invalid boot order specified,"
4305 " must be one or more of [acdn]"
4308 instance = self.cfg.GetInstanceInfo(
4309 self.cfg.ExpandInstanceName(self.op.instance_name))
4310 if instance is None:
4311 raise errors.OpPrereqError("No such instance name '%s'" %
4312 self.op.instance_name)
4313 self.op.instance_name = instance.name
4314 self.instance = instance
4317 def Exec(self, feedback_fn):
4318 """Modifies an instance.
4320 All parameters take effect only at the next restart of the instance.
4323 instance = self.instance
4325 instance.memory = self.mem
4326 result.append(("mem", self.mem))
4328 instance.vcpus = self.vcpus
4329 result.append(("vcpus", self.vcpus))
4331 instance.nics[0].ip = self.ip
4332 result.append(("ip", self.ip))
4334 instance.nics[0].bridge = self.bridge
4335 result.append(("bridge", self.bridge))
4337 instance.nics[0].mac = self.mac
4338 result.append(("mac", self.mac))
4339 if self.do_kernel_path:
4340 instance.kernel_path = self.kernel_path
4341 result.append(("kernel_path", self.kernel_path))
4342 if self.do_initrd_path:
4343 instance.initrd_path = self.initrd_path
4344 result.append(("initrd_path", self.initrd_path))
4345 if self.hvm_boot_order:
4346 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4347 instance.hvm_boot_order = None
4349 instance.hvm_boot_order = self.hvm_boot_order
4350 result.append(("hvm_boot_order", self.hvm_boot_order))
4352 self.cfg.AddInstance(instance)
4357 class LUQueryExports(NoHooksLU):
4358 """Query the exports list
4363 def CheckPrereq(self):
4364 """Check that the nodelist contains only existing nodes.
4367 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4369 def Exec(self, feedback_fn):
4370 """Compute the list of all the exported system images.
4373 a dictionary with the structure node->(export-list)
4374 where export-list is a list of the instances exported on
4378 return rpc.call_export_list(self.nodes)
4381 class LUExportInstance(LogicalUnit):
4382 """Export an instance to an image in the cluster.
4385 HPATH = "instance-export"
4386 HTYPE = constants.HTYPE_INSTANCE
4387 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4389 def BuildHooksEnv(self):
4392 This will run on the master, primary node and target node.
4396 "EXPORT_NODE": self.op.target_node,
4397 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4399 env.update(_BuildInstanceHookEnvByObject(self.instance))
4400 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4401 self.op.target_node]
4404 def CheckPrereq(self):
4405 """Check prerequisites.
4407 This checks that the instance name is a valid one.
4410 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4411 self.instance = self.cfg.GetInstanceInfo(instance_name)
4412 if self.instance is None:
4413 raise errors.OpPrereqError("Instance '%s' not found" %
4414 self.op.instance_name)
4417 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4418 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4420 if self.dst_node is None:
4421 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4422 self.op.target_node)
4423 self.op.target_node = self.dst_node.name
4425 def Exec(self, feedback_fn):
4426 """Export an instance to an image in the cluster.
4429 instance = self.instance
4430 dst_node = self.dst_node
4431 src_node = instance.primary_node
4432 if self.op.shutdown:
4433 # shutdown the instance, but not the disks
4434 if not rpc.call_instance_shutdown(src_node, instance):
4435 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4436 (instance.name, source_node))
4438 vgname = self.cfg.GetVGName()
4443 for disk in instance.disks:
4444 if disk.iv_name == "sda":
4445 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4446 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4448 if not new_dev_name:
4449 logger.Error("could not snapshot block device %s on node %s" %
4450 (disk.logical_id[1], src_node))
4452 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4453 logical_id=(vgname, new_dev_name),
4454 physical_id=(vgname, new_dev_name),
4455 iv_name=disk.iv_name)
4456 snap_disks.append(new_dev)
4459 if self.op.shutdown and instance.status == "up":
4460 if not rpc.call_instance_start(src_node, instance, None):
4461 _ShutdownInstanceDisks(instance, self.cfg)
4462 raise errors.OpExecError("Could not start instance")
4464 # TODO: check for size
4466 for dev in snap_disks:
4467 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4469 logger.Error("could not export block device %s from node"
4471 (dev.logical_id[1], src_node, dst_node.name))
4472 if not rpc.call_blockdev_remove(src_node, dev):
4473 logger.Error("could not remove snapshot block device %s from"
4474 " node %s" % (dev.logical_id[1], src_node))
4476 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4477 logger.Error("could not finalize export for instance %s on node %s" %
4478 (instance.name, dst_node.name))
4480 nodelist = self.cfg.GetNodeList()
4481 nodelist.remove(dst_node.name)
4483 # on one-node clusters nodelist will be empty after the removal
4484 # if we proceed the backup would be removed because OpQueryExports
4485 # substitutes an empty list with the full cluster node list.
4487 op = opcodes.OpQueryExports(nodes=nodelist)
4488 exportlist = self.proc.ChainOpCode(op)
4489 for node in exportlist:
4490 if instance.name in exportlist[node]:
4491 if not rpc.call_export_remove(node, instance.name):
4492 logger.Error("could not remove older export for instance %s"
4493 " on node %s" % (instance.name, node))
4496 class TagsLU(NoHooksLU):
4499 This is an abstract class which is the parent of all the other tags LUs.
4502 def CheckPrereq(self):
4503 """Check prerequisites.
4506 if self.op.kind == constants.TAG_CLUSTER:
4507 self.target = self.cfg.GetClusterInfo()
4508 elif self.op.kind == constants.TAG_NODE:
4509 name = self.cfg.ExpandNodeName(self.op.name)
4511 raise errors.OpPrereqError("Invalid node name (%s)" %
4514 self.target = self.cfg.GetNodeInfo(name)
4515 elif self.op.kind == constants.TAG_INSTANCE:
4516 name = self.cfg.ExpandInstanceName(self.op.name)
4518 raise errors.OpPrereqError("Invalid instance name (%s)" %
4521 self.target = self.cfg.GetInstanceInfo(name)
4523 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4527 class LUGetTags(TagsLU):
4528 """Returns the tags of a given object.
4531 _OP_REQP = ["kind", "name"]
4533 def Exec(self, feedback_fn):
4534 """Returns the tag list.
4537 return self.target.GetTags()
4540 class LUSearchTags(NoHooksLU):
4541 """Searches the tags for a given pattern.
4544 _OP_REQP = ["pattern"]
4546 def CheckPrereq(self):
4547 """Check prerequisites.
4549 This checks the pattern passed for validity by compiling it.
4553 self.re = re.compile(self.op.pattern)
4554 except re.error, err:
4555 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4556 (self.op.pattern, err))
4558 def Exec(self, feedback_fn):
4559 """Returns the tag list.
4563 tgts = [("/cluster", cfg.GetClusterInfo())]
4564 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4565 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4566 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4567 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4569 for path, target in tgts:
4570 for tag in target.GetTags():
4571 if self.re.search(tag):
4572 results.append((path, tag))
4576 class LUAddTags(TagsLU):
4577 """Sets a tag on a given object.
4580 _OP_REQP = ["kind", "name", "tags"]
4582 def CheckPrereq(self):
4583 """Check prerequisites.
4585 This checks the type and length of the tag name and value.
4588 TagsLU.CheckPrereq(self)
4589 for tag in self.op.tags:
4590 objects.TaggableObject.ValidateTag(tag)
4592 def Exec(self, feedback_fn):
4597 for tag in self.op.tags:
4598 self.target.AddTag(tag)
4599 except errors.TagError, err:
4600 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4602 self.cfg.Update(self.target)
4603 except errors.ConfigurationError:
4604 raise errors.OpRetryError("There has been a modification to the"
4605 " config file and the operation has been"
4606 " aborted. Please retry.")
4609 class LUDelTags(TagsLU):
4610 """Delete a list of tags from a given object.
4613 _OP_REQP = ["kind", "name", "tags"]
4615 def CheckPrereq(self):
4616 """Check prerequisites.
4618 This checks that we have the given tag.
4621 TagsLU.CheckPrereq(self)
4622 for tag in self.op.tags:
4623 objects.TaggableObject.ValidateTag(tag)
4624 del_tags = frozenset(self.op.tags)
4625 cur_tags = self.target.GetTags()
4626 if not del_tags <= cur_tags:
4627 diff_tags = del_tags - cur_tags
4628 diff_names = ["'%s'" % tag for tag in diff_tags]
4630 raise errors.OpPrereqError("Tag(s) %s not found" %
4631 (",".join(diff_names)))
4633 def Exec(self, feedback_fn):
4634 """Remove the tag from the object.
4637 for tag in self.op.tags:
4638 self.target.RemoveTag(tag)
4640 self.cfg.Update(self.target)
4641 except errors.ConfigurationError:
4642 raise errors.OpRetryError("There has been a modification to the"
4643 " config file and the operation has been"
4644 " aborted. Please retry.")
4646 class LUTestDelay(NoHooksLU):
4647 """Sleep for a specified amount of time.
4649 This LU sleeps on the master and/or nodes for a specified amoutn of
4653 _OP_REQP = ["duration", "on_master", "on_nodes"]
4655 def CheckPrereq(self):
4656 """Check prerequisites.
4658 This checks that we have a good list of nodes and/or the duration
4663 if self.op.on_nodes:
4664 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4666 def Exec(self, feedback_fn):
4667 """Do the actual sleep.
4670 if self.op.on_master:
4671 if not utils.TestDelay(self.op.duration):
4672 raise errors.OpExecError("Error during master delay test")
4673 if self.op.on_nodes:
4674 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4676 raise errors.OpExecError("Complete failure from rpc call")
4677 for node, node_result in result.items():
4679 raise errors.OpExecError("Failure during rpc call to node %s,"
4680 " result: %s" % (node, node_result))
4683 def _AllocatorGetClusterData(cfg, sstore):
4684 """Compute the generic allocator input data.
4686 This is the data that is independent of the actual operation.
4692 "cluster_name": sstore.GetClusterName(),
4693 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4694 # we don't have job IDs
4699 node_list = cfg.GetNodeList()
4700 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4701 for nname in node_list:
4702 ninfo = cfg.GetNodeInfo(nname)
4703 if nname not in node_data or not isinstance(node_data[nname], dict):
4704 raise errors.OpExecError("Can't get data for node %s" % nname)
4705 remote_info = node_data[nname]
4706 for attr in ['memory_total', 'memory_free',
4707 'vg_size', 'vg_free']:
4708 if attr not in remote_info:
4709 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4712 int(remote_info[attr])
4713 except ValueError, err:
4714 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4715 " %s" % (nname, attr, str(err)))
4717 "tags": list(ninfo.GetTags()),
4718 "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4719 "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4720 "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4721 "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4722 "primary_ip": ninfo.primary_ip,
4723 "secondary_ip": ninfo.secondary_ip,
4725 node_results[nname] = pnr
4726 data["nodes"] = node_results
4730 i_list = cfg.GetInstanceList()
4731 for iname in i_list:
4732 iinfo = cfg.GetInstanceInfo(iname)
4733 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4734 for n in iinfo.nics]
4736 "tags": list(iinfo.GetTags()),
4737 "should_run": iinfo.status == "up",
4738 "vcpus": iinfo.vcpus,
4739 "memory": iinfo.memory,
4741 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4743 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4744 "disk_template": iinfo.disk_template,
4746 instance_data[iname] = pir
4748 data["instances"] = instance_data
4753 def _AllocatorAddNewInstance(data, op):
4754 """Add new instance data to allocator structure.
4756 This in combination with _AllocatorGetClusterData will create the
4757 correct structure needed as input for the allocator.
4759 The checks for the completeness of the opcode must have already been
4766 "disk_template": op.disk_template,
4770 "memory": op.mem_size,
4774 data["request"] = request
4777 def _AllocatorAddRelocateInstance(data, op):
4778 """Add relocate instance data to allocator structure.
4780 This in combination with _AllocatorGetClusterData will create the
4781 correct structure needed as input for the allocator.
4783 The checks for the completeness of the opcode must have already been
4788 "type": "replace_secondary",
4791 data["request"] = request
4794 class LUTestAllocator(NoHooksLU):
4795 """Run allocator tests.
4797 This LU runs the allocator tests
4800 _OP_REQP = ["direction", "mode", "name"]
4802 def CheckPrereq(self):
4803 """Check prerequisites.
4805 This checks the opcode parameters depending on the director and mode test.
4808 if self.op.mode == constants.ALF_MODE_ALLOC:
4809 for attr in ["name", "mem_size", "disks", "disk_template",
4810 "os", "tags", "nics", "vcpus"]:
4811 if not hasattr(self.op, attr):
4812 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4814 iname = self.cfg.ExpandInstanceName(self.op.name)
4815 if iname is not None:
4816 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4818 if not isinstance(self.op.nics, list):
4819 raise errors.OpPrereqError("Invalid parameter 'nics'")
4820 for row in self.op.nics:
4821 if (not isinstance(row, dict) or
4824 "bridge" not in row):
4825 raise errors.OpPrereqError("Invalid contents of the"
4826 " 'nics' parameter")
4827 if not isinstance(self.op.disks, list):
4828 raise errors.OpPrereqError("Invalid parameter 'disks'")
4829 for row in self.op.disks:
4830 if (not isinstance(row, dict) or
4831 "size" not in row or
4832 not isinstance(row["size"], int) or
4833 "mode" not in row or
4834 row["mode"] not in ['r', 'w']):
4835 raise errors.OpPrereqError("Invalid contents of the"
4836 " 'disks' parameter")
4837 elif self.op.mode == constants.ALF_MODE_RELOC:
4838 if not hasattr(self.op, "name"):
4839 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4840 fname = self.cfg.ExpandInstanceName(self.op.name)
4842 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4844 self.op.name = fname
4846 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4849 if self.op.direction == constants.ALF_DIR_OUT:
4850 if not hasattr(self.op, "allocator"):
4851 raise errors.OpPrereqError("Missing allocator name")
4852 raise errors.OpPrereqError("Allocator out mode not supported yet")
4853 elif self.op.direction != constants.ALF_DIR_IN:
4854 raise errors.OpPrereqError("Wrong allocator test '%s'" %
4857 def Exec(self, feedback_fn):
4858 """Run the allocator test.
4861 data = _AllocatorGetClusterData(self.cfg, self.sstore)
4862 if self.op.mode == constants.ALF_MODE_ALLOC:
4863 _AllocatorAddNewInstance(data, self.op)
4865 _AllocatorAddRelocateInstance(data, self.op)
4867 if _JSON_INDENT is None:
4868 text = simplejson.dumps(data)
4870 text = simplejson.dumps(data, indent=_JSON_INDENT)