4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement CheckPrereq which also fills in the opcode instance
52 with all the fields (even if as None)
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements (REQ_CLUSTER,
57 REQ_MASTER); note that all commands require root permissions
66 def __init__(self, processor, op, cfg, sstore):
67 """Constructor for LogicalUnit.
69 This needs to be overriden in derived classes in order to check op
77 for attr_name in self._OP_REQP:
78 attr_val = getattr(op, attr_name, None)
80 raise errors.OpPrereqError("Required parameter '%s' missing" %
83 if not cfg.IsCluster():
84 raise errors.OpPrereqError("Cluster not initialized yet,"
85 " use 'gnt-cluster init' first.")
87 master = sstore.GetMasterNode()
88 if master != utils.HostInfo().name:
89 raise errors.OpPrereqError("Commands must be run on the master"
92 def CheckPrereq(self):
93 """Check prerequisites for this LU.
95 This method should check that the prerequisites for the execution
96 of this LU are fulfilled. It can do internode communication, but
97 it should be idempotent - no cluster or system changes are
100 The method should raise errors.OpPrereqError in case something is
101 not fulfilled. Its return value is ignored.
103 This method should also update all the parameters of the opcode to
104 their canonical form; e.g. a short node name must be fully
105 expanded after this method has successfully completed (so that
106 hooks, logging, etc. work correctly).
109 raise NotImplementedError
111 def Exec(self, feedback_fn):
114 This method should implement the actual work. It should raise
115 errors.OpExecError for failures that are somewhat dealt with in
119 raise NotImplementedError
121 def BuildHooksEnv(self):
122 """Build hooks environment for this LU.
124 This method should return a three-node tuple consisting of: a dict
125 containing the environment that will be used for running the
126 specific hook for this LU, a list of node names on which the hook
127 should run before the execution, and a list of node names on which
128 the hook should run after the execution.
130 The keys of the dict must not have 'GANETI_' prefixed as this will
131 be handled in the hooks runner. Also note additional keys will be
132 added by the hooks runner. If the LU doesn't define any
133 environment, an empty dict (and not None) should be returned.
135 As for the node lists, the master should not be included in the
136 them, as it will be added by the hooks runner in case this LU
137 requires a cluster to run on (otherwise we don't have a node
138 list). No nodes should be returned as an empty list (and not
141 Note that if the HPATH for a LU class is None, this function will
145 raise NotImplementedError
148 class NoHooksLU(LogicalUnit):
149 """Simple LU which runs no hooks.
151 This LU is intended as a parent for other LogicalUnits which will
152 run no hooks, in order to reduce duplicate code.
158 def BuildHooksEnv(self):
161 This is a no-op, since we don't run hooks.
167 def _AddHostToEtcHosts(hostname):
168 """Wrapper around utils.SetEtcHostsEntry.
171 hi = utils.HostInfo(name=hostname)
172 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
175 def _RemoveHostFromEtcHosts(hostname):
176 """Wrapper around utils.RemoveEtcHostsEntry.
179 hi = utils.HostInfo(name=hostname)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
181 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
184 def _GetWantedNodes(lu, nodes):
185 """Returns list of checked and expanded node names.
188 nodes: List of nodes (strings) or None for all
191 if not isinstance(nodes, list):
192 raise errors.OpPrereqError("Invalid argument type 'nodes'")
198 node = lu.cfg.ExpandNodeName(name)
200 raise errors.OpPrereqError("No such node name '%s'" % name)
204 wanted = lu.cfg.GetNodeList()
205 return utils.NiceSort(wanted)
208 def _GetWantedInstances(lu, instances):
209 """Returns list of checked and expanded instance names.
212 instances: List of instances (strings) or None for all
215 if not isinstance(instances, list):
216 raise errors.OpPrereqError("Invalid argument type 'instances'")
221 for name in instances:
222 instance = lu.cfg.ExpandInstanceName(name)
224 raise errors.OpPrereqError("No such instance name '%s'" % name)
225 wanted.append(instance)
228 wanted = lu.cfg.GetInstanceList()
229 return utils.NiceSort(wanted)
232 def _CheckOutputFields(static, dynamic, selected):
233 """Checks whether all selected fields are valid.
236 static: Static fields
237 dynamic: Dynamic fields
240 static_fields = frozenset(static)
241 dynamic_fields = frozenset(dynamic)
243 all_fields = static_fields | dynamic_fields
245 if not all_fields.issuperset(selected):
246 raise errors.OpPrereqError("Unknown output fields selected: %s"
247 % ",".join(frozenset(selected).
248 difference(all_fields)))
251 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
252 memory, vcpus, nics):
253 """Builds instance related env variables for hooks from single variables.
256 secondary_nodes: List of secondary nodes as strings
260 "INSTANCE_NAME": name,
261 "INSTANCE_PRIMARY": primary_node,
262 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
263 "INSTANCE_OS_TYPE": os_type,
264 "INSTANCE_STATUS": status,
265 "INSTANCE_MEMORY": memory,
266 "INSTANCE_VCPUS": vcpus,
270 nic_count = len(nics)
271 for idx, (ip, bridge, mac) in enumerate(nics):
274 env["INSTANCE_NIC%d_IP" % idx] = ip
275 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
276 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
280 env["INSTANCE_NIC_COUNT"] = nic_count
285 def _BuildInstanceHookEnvByObject(instance, override=None):
286 """Builds instance related env variables for hooks from an object.
289 instance: objects.Instance object of instance
290 override: dict of values to override
293 'name': instance.name,
294 'primary_node': instance.primary_node,
295 'secondary_nodes': instance.secondary_nodes,
296 'os_type': instance.os,
297 'status': instance.os,
298 'memory': instance.memory,
299 'vcpus': instance.vcpus,
300 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
303 args.update(override)
304 return _BuildInstanceHookEnv(**args)
307 def _UpdateKnownHosts(fullnode, ip, pubkey):
308 """Ensure a node has a correct known_hosts entry.
311 fullnode - Fully qualified domain name of host. (str)
312 ip - IPv4 address of host (str)
313 pubkey - the public key of the cluster
316 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
317 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
319 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
328 logger.Debug('read %s' % (repr(rawline),))
330 parts = rawline.rstrip('\r\n').split()
332 # Ignore unwanted lines
333 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
334 fields = parts[0].split(',')
339 for spec in [ ip, fullnode ]:
340 if spec not in fields:
345 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
346 if haveall and key == pubkey:
348 save_lines.append(rawline)
349 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
352 if havesome and (not haveall or key != pubkey):
354 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
357 save_lines.append(rawline)
360 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
361 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
364 save_lines = save_lines + add_lines
366 # Write a new file and replace old.
367 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
369 newfile = os.fdopen(fd, 'w')
371 newfile.write(''.join(save_lines))
374 logger.Debug("Wrote new known_hosts.")
375 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
378 # Simply appending a new line will do the trick.
380 for add in add_lines:
386 def _HasValidVG(vglist, vgname):
387 """Checks if the volume group list is valid.
389 A non-None return value means there's an error, and the return value
390 is the error message.
393 vgsize = vglist.get(vgname, None)
395 return "volume group '%s' missing" % vgname
397 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
402 def _InitSSHSetup(node):
403 """Setup the SSH configuration for the cluster.
406 This generates a dsa keypair for root, adds the pub key to the
407 permitted hosts and adds the hostkey to its own known hosts.
410 node: the name of this host as a fqdn
413 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
415 for name in priv_key, pub_key:
416 if os.path.exists(name):
417 utils.CreateBackup(name)
418 utils.RemoveFile(name)
420 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
424 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
427 f = open(pub_key, 'r')
429 utils.AddAuthorizedKey(auth_keys, f.read(8192))
434 def _InitGanetiServerSetup(ss):
435 """Setup the necessary configuration for the initial node daemon.
437 This creates the nodepass file containing the shared password for
438 the cluster and also generates the SSL certificate.
441 # Create pseudo random password
442 randpass = sha.new(os.urandom(64)).hexdigest()
443 # and write it into sstore
444 ss.SetKey(ss.SS_NODED_PASS, randpass)
446 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
447 "-days", str(365*5), "-nodes", "-x509",
448 "-keyout", constants.SSL_CERT_FILE,
449 "-out", constants.SSL_CERT_FILE, "-batch"])
451 raise errors.OpExecError("could not generate server ssl cert, command"
452 " %s had exitcode %s and error message %s" %
453 (result.cmd, result.exit_code, result.output))
455 os.chmod(constants.SSL_CERT_FILE, 0400)
457 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
460 raise errors.OpExecError("Could not start the node daemon, command %s"
461 " had exitcode %s and error %s" %
462 (result.cmd, result.exit_code, result.output))
465 def _CheckInstanceBridgesExist(instance):
466 """Check that the brigdes needed by an instance exist.
469 # check bridges existance
470 brlist = [nic.bridge for nic in instance.nics]
471 if not rpc.call_bridges_exist(instance.primary_node, brlist):
472 raise errors.OpPrereqError("one or more target bridges %s does not"
473 " exist on destination node '%s'" %
474 (brlist, instance.primary_node))
477 class LUInitCluster(LogicalUnit):
478 """Initialise the cluster.
481 HPATH = "cluster-init"
482 HTYPE = constants.HTYPE_CLUSTER
483 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
484 "def_bridge", "master_netdev"]
487 def BuildHooksEnv(self):
490 Notes: Since we don't require a cluster, we must manually add
491 ourselves in the post-run node list.
494 env = {"OP_TARGET": self.op.cluster_name}
495 return env, [], [self.hostname.name]
497 def CheckPrereq(self):
498 """Verify that the passed name is a valid one.
501 if config.ConfigWriter.IsCluster():
502 raise errors.OpPrereqError("Cluster is already initialised")
504 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
505 if not os.path.exists(constants.VNC_PASSWORD_FILE):
506 raise errors.OpPrereqError("Please prepare the cluster VNC"
508 constants.VNC_PASSWORD_FILE)
510 self.hostname = hostname = utils.HostInfo()
512 if hostname.ip.startswith("127."):
513 raise errors.OpPrereqError("This host's IP resolves to the private"
514 " range (%s). Please fix DNS or %s." %
515 (hostname.ip, constants.ETC_HOSTS))
517 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
519 if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
520 constants.DEFAULT_NODED_PORT):
521 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
522 " to %s,\nbut this ip address does not"
523 " belong to this host."
524 " Aborting." % hostname.ip)
526 secondary_ip = getattr(self.op, "secondary_ip", None)
527 if secondary_ip and not utils.IsValidIP(secondary_ip):
528 raise errors.OpPrereqError("Invalid secondary ip given")
530 secondary_ip != hostname.ip and
531 (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
532 constants.DEFAULT_NODED_PORT))):
533 raise errors.OpPrereqError("You gave %s as secondary IP,"
534 " but it does not belong to this host." %
536 self.secondary_ip = secondary_ip
538 # checks presence of the volume group given
539 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
542 raise errors.OpPrereqError("Error: %s" % vgstatus)
544 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
546 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
549 if self.op.hypervisor_type not in constants.HYPER_TYPES:
550 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
551 self.op.hypervisor_type)
553 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
555 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
556 (self.op.master_netdev,
557 result.output.strip()))
559 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
560 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
561 raise errors.OpPrereqError("Init.d script '%s' missing or not"
562 " executable." % constants.NODE_INITD_SCRIPT)
564 def Exec(self, feedback_fn):
565 """Initialize the cluster.
568 clustername = self.clustername
569 hostname = self.hostname
571 # set up the simple store
572 self.sstore = ss = ssconf.SimpleStore()
573 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
574 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
575 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
576 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
577 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
579 # set up the inter-node password and certificate
580 _InitGanetiServerSetup(ss)
582 # start the master ip
583 rpc.call_node_start_master(hostname.name)
585 # set up ssh config and /etc/hosts
586 f = open(constants.SSH_HOST_RSA_PUB, 'r')
591 sshkey = sshline.split(" ")[1]
593 _AddHostToEtcHosts(hostname.name)
595 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
597 _InitSSHSetup(hostname.name)
599 # init of cluster config file
600 self.cfg = cfgw = config.ConfigWriter()
601 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
602 sshkey, self.op.mac_prefix,
603 self.op.vg_name, self.op.def_bridge)
606 class LUDestroyCluster(NoHooksLU):
607 """Logical unit for destroying the cluster.
612 def CheckPrereq(self):
613 """Check prerequisites.
615 This checks whether the cluster is empty.
617 Any errors are signalled by raising errors.OpPrereqError.
620 master = self.sstore.GetMasterNode()
622 nodelist = self.cfg.GetNodeList()
623 if len(nodelist) != 1 or nodelist[0] != master:
624 raise errors.OpPrereqError("There are still %d node(s) in"
625 " this cluster." % (len(nodelist) - 1))
626 instancelist = self.cfg.GetInstanceList()
628 raise errors.OpPrereqError("There are still %d instance(s) in"
629 " this cluster." % len(instancelist))
631 def Exec(self, feedback_fn):
632 """Destroys the cluster.
635 master = self.sstore.GetMasterNode()
636 if not rpc.call_node_stop_master(master):
637 raise errors.OpExecError("Could not disable the master role")
638 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
639 utils.CreateBackup(priv_key)
640 utils.CreateBackup(pub_key)
641 rpc.call_node_leave_cluster(master)
644 class LUVerifyCluster(NoHooksLU):
645 """Verifies the cluster status.
650 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
651 remote_version, feedback_fn):
652 """Run multiple tests against a node.
655 - compares ganeti version
656 - checks vg existance and size > 20G
657 - checks config file checksum
658 - checks ssh to other nodes
661 node: name of the node to check
662 file_list: required list of files
663 local_cksum: dictionary of local files and their checksums
666 # compares ganeti version
667 local_version = constants.PROTOCOL_VERSION
668 if not remote_version:
669 feedback_fn(" - ERROR: connection to %s failed" % (node))
672 if local_version != remote_version:
673 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
674 (local_version, node, remote_version))
677 # checks vg existance and size > 20G
681 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
685 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
687 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
690 # checks config file checksum
693 if 'filelist' not in node_result:
695 feedback_fn(" - ERROR: node hasn't returned file checksum data")
697 remote_cksum = node_result['filelist']
698 for file_name in file_list:
699 if file_name not in remote_cksum:
701 feedback_fn(" - ERROR: file '%s' missing" % file_name)
702 elif remote_cksum[file_name] != local_cksum[file_name]:
704 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
706 if 'nodelist' not in node_result:
708 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
710 if node_result['nodelist']:
712 for node in node_result['nodelist']:
713 feedback_fn(" - ERROR: communication with node '%s': %s" %
714 (node, node_result['nodelist'][node]))
715 hyp_result = node_result.get('hypervisor', None)
716 if hyp_result is not None:
717 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
720 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
721 """Verify an instance.
723 This function checks to see if the required block devices are
724 available on the instance's node.
729 instancelist = self.cfg.GetInstanceList()
730 if not instance in instancelist:
731 feedback_fn(" - ERROR: instance %s not in instance list %s" %
732 (instance, instancelist))
735 instanceconfig = self.cfg.GetInstanceInfo(instance)
736 node_current = instanceconfig.primary_node
739 instanceconfig.MapLVsByNode(node_vol_should)
741 for node in node_vol_should:
742 for volume in node_vol_should[node]:
743 if node not in node_vol_is or volume not in node_vol_is[node]:
744 feedback_fn(" - ERROR: volume %s missing on node %s" %
748 if not instanceconfig.status == 'down':
749 if not instance in node_instance[node_current]:
750 feedback_fn(" - ERROR: instance %s not running on node %s" %
751 (instance, node_current))
754 for node in node_instance:
755 if (not node == node_current):
756 if instance in node_instance[node]:
757 feedback_fn(" - ERROR: instance %s should not run on node %s" %
763 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764 """Verify if there are any unknown volumes in the cluster.
766 The .os, .swap and backup volumes are ignored. All other volumes are
772 for node in node_vol_is:
773 for volume in node_vol_is[node]:
774 if node not in node_vol_should or volume not in node_vol_should[node]:
775 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
780 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781 """Verify the list of running instances.
783 This checks what instances are running but unknown to the cluster.
787 for node in node_instance:
788 for runninginstance in node_instance[node]:
789 if runninginstance not in instancelist:
790 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
791 (runninginstance, node))
795 def CheckPrereq(self):
796 """Check prerequisites.
798 This has no prerequisites.
803 def Exec(self, feedback_fn):
804 """Verify integrity of cluster, performing various test on nodes.
808 feedback_fn("* Verifying global settings")
809 for msg in self.cfg.VerifyConfig():
810 feedback_fn(" - ERROR: %s" % msg)
812 vg_name = self.cfg.GetVGName()
813 nodelist = utils.NiceSort(self.cfg.GetNodeList())
814 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
818 # FIXME: verify OS list
820 file_names = list(self.sstore.GetFileList())
821 file_names.append(constants.SSL_CERT_FILE)
822 file_names.append(constants.CLUSTER_CONF_FILE)
823 local_checksums = utils.FingerprintFiles(file_names)
825 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
826 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
827 all_instanceinfo = rpc.call_instance_list(nodelist)
828 all_vglist = rpc.call_vg_list(nodelist)
829 node_verify_param = {
830 'filelist': file_names,
831 'nodelist': nodelist,
834 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
835 all_rversion = rpc.call_version(nodelist)
837 for node in nodelist:
838 feedback_fn("* Verifying node %s" % node)
839 result = self._VerifyNode(node, file_names, local_checksums,
840 all_vglist[node], all_nvinfo[node],
841 all_rversion[node], feedback_fn)
845 volumeinfo = all_volumeinfo[node]
847 if isinstance(volumeinfo, basestring):
848 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
849 (node, volumeinfo[-400:].encode('string_escape')))
851 node_volume[node] = {}
852 elif not isinstance(volumeinfo, dict):
853 feedback_fn(" - ERROR: connection to %s failed" % (node,))
857 node_volume[node] = volumeinfo
860 nodeinstance = all_instanceinfo[node]
861 if type(nodeinstance) != list:
862 feedback_fn(" - ERROR: connection to %s failed" % (node,))
866 node_instance[node] = nodeinstance
870 for instance in instancelist:
871 feedback_fn("* Verifying instance %s" % instance)
872 result = self._VerifyInstance(instance, node_volume, node_instance,
876 inst_config = self.cfg.GetInstanceInfo(instance)
878 inst_config.MapLVsByNode(node_vol_should)
880 feedback_fn("* Verifying orphan volumes")
881 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
885 feedback_fn("* Verifying remaining instances")
886 result = self._VerifyOrphanInstances(instancelist, node_instance,
893 class LUVerifyDisks(NoHooksLU):
894 """Verifies the cluster disks status.
899 def CheckPrereq(self):
900 """Check prerequisites.
902 This has no prerequisites.
907 def Exec(self, feedback_fn):
908 """Verify integrity of cluster disks.
911 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
913 vg_name = self.cfg.GetVGName()
914 nodes = utils.NiceSort(self.cfg.GetNodeList())
915 instances = [self.cfg.GetInstanceInfo(name)
916 for name in self.cfg.GetInstanceList()]
919 for inst in instances:
921 if (inst.status != "up" or
922 inst.disk_template not in constants.DTS_NET_MIRROR):
924 inst.MapLVsByNode(inst_lvs)
925 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
926 for node, vol_list in inst_lvs.iteritems():
928 nv_dict[(node, vol)] = inst
933 node_lvs = rpc.call_volume_list(nodes, vg_name)
940 if isinstance(lvs, basestring):
941 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
943 elif not isinstance(lvs, dict):
944 logger.Info("connection to node %s failed or invalid data returned" %
946 res_nodes.append(node)
949 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
950 inst = nv_dict.pop((node, lv_name), None)
951 if (not lv_online and inst is not None
952 and inst.name not in res_instances):
953 res_instances.append(inst.name)
955 # any leftover items in nv_dict are missing LVs, let's arrange the
957 for key, inst in nv_dict.iteritems():
958 if inst.name not in res_missing:
959 res_missing[inst.name] = []
960 res_missing[inst.name].append(key)
965 class LURenameCluster(LogicalUnit):
966 """Rename the cluster.
969 HPATH = "cluster-rename"
970 HTYPE = constants.HTYPE_CLUSTER
973 def BuildHooksEnv(self):
978 "OP_TARGET": self.sstore.GetClusterName(),
979 "NEW_NAME": self.op.name,
981 mn = self.sstore.GetMasterNode()
982 return env, [mn], [mn]
984 def CheckPrereq(self):
985 """Verify that the passed name is a valid one.
988 hostname = utils.HostInfo(self.op.name)
990 new_name = hostname.name
991 self.ip = new_ip = hostname.ip
992 old_name = self.sstore.GetClusterName()
993 old_ip = self.sstore.GetMasterIP()
994 if new_name == old_name and new_ip == old_ip:
995 raise errors.OpPrereqError("Neither the name nor the IP address of the"
996 " cluster has changed")
998 result = utils.RunCmd(["fping", "-q", new_ip])
999 if not result.failed:
1000 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1001 " reachable on the network. Aborting." %
1004 self.op.name = new_name
1006 def Exec(self, feedback_fn):
1007 """Rename the cluster.
1010 clustername = self.op.name
1014 # shutdown the master IP
1015 master = ss.GetMasterNode()
1016 if not rpc.call_node_stop_master(master):
1017 raise errors.OpExecError("Could not disable the master role")
1021 ss.SetKey(ss.SS_MASTER_IP, ip)
1022 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1024 # Distribute updated ss config to all nodes
1025 myself = self.cfg.GetNodeInfo(master)
1026 dist_nodes = self.cfg.GetNodeList()
1027 if myself.name in dist_nodes:
1028 dist_nodes.remove(myself.name)
1030 logger.Debug("Copying updated ssconf data to all nodes")
1031 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1032 fname = ss.KeyToFilename(keyname)
1033 result = rpc.call_upload_file(dist_nodes, fname)
1034 for to_node in dist_nodes:
1035 if not result[to_node]:
1036 logger.Error("copy of file %s to node %s failed" %
1039 if not rpc.call_node_start_master(master):
1040 logger.Error("Could not re-enable the master role on the master,"
1041 " please restart manually.")
1044 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1045 """Sleep and poll for an instance's disk to sync.
1048 if not instance.disks:
1052 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1054 node = instance.primary_node
1056 for dev in instance.disks:
1057 cfgw.SetDiskID(dev, node)
1063 cumul_degraded = False
1064 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1066 proc.LogWarning("Can't get any data from node %s" % node)
1069 raise errors.RemoteError("Can't contact node %s for mirror data,"
1070 " aborting." % node)
1074 for i in range(len(rstats)):
1077 proc.LogWarning("Can't compute data for node %s/%s" %
1078 (node, instance.disks[i].iv_name))
1080 # we ignore the ldisk parameter
1081 perc_done, est_time, is_degraded, _ = mstat
1082 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1083 if perc_done is not None:
1085 if est_time is not None:
1086 rem_time = "%d estimated seconds remaining" % est_time
1089 rem_time = "no time estimate"
1090 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1091 (instance.disks[i].iv_name, perc_done, rem_time))
1098 time.sleep(min(60, max_time))
1104 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1105 return not cumul_degraded
1108 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1109 """Check that mirrors are not degraded.
1111 The ldisk parameter, if True, will change the test from the
1112 is_degraded attribute (which represents overall non-ok status for
1113 the device(s)) to the ldisk (representing the local storage status).
1116 cfgw.SetDiskID(dev, node)
1123 if on_primary or dev.AssembleOnSecondary():
1124 rstats = rpc.call_blockdev_find(node, dev)
1126 logger.ToStderr("Can't get any data from node %s" % node)
1129 result = result and (not rstats[idx])
1131 for child in dev.children:
1132 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1137 class LUDiagnoseOS(NoHooksLU):
1138 """Logical unit for OS diagnose/query.
1143 def CheckPrereq(self):
1144 """Check prerequisites.
1146 This always succeeds, since this is a pure query LU.
1151 def Exec(self, feedback_fn):
1152 """Compute the list of OSes.
1155 node_list = self.cfg.GetNodeList()
1156 node_data = rpc.call_os_diagnose(node_list)
1157 if node_data == False:
1158 raise errors.OpExecError("Can't gather the list of OSes")
1162 class LURemoveNode(LogicalUnit):
1163 """Logical unit for removing a node.
1166 HPATH = "node-remove"
1167 HTYPE = constants.HTYPE_NODE
1168 _OP_REQP = ["node_name"]
1170 def BuildHooksEnv(self):
1173 This doesn't run on the target node in the pre phase as a failed
1174 node would not allows itself to run.
1178 "OP_TARGET": self.op.node_name,
1179 "NODE_NAME": self.op.node_name,
1181 all_nodes = self.cfg.GetNodeList()
1182 all_nodes.remove(self.op.node_name)
1183 return env, all_nodes, all_nodes
1185 def CheckPrereq(self):
1186 """Check prerequisites.
1189 - the node exists in the configuration
1190 - it does not have primary or secondary instances
1191 - it's not the master
1193 Any errors are signalled by raising errors.OpPrereqError.
1196 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1198 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1200 instance_list = self.cfg.GetInstanceList()
1202 masternode = self.sstore.GetMasterNode()
1203 if node.name == masternode:
1204 raise errors.OpPrereqError("Node is the master node,"
1205 " you need to failover first.")
1207 for instance_name in instance_list:
1208 instance = self.cfg.GetInstanceInfo(instance_name)
1209 if node.name == instance.primary_node:
1210 raise errors.OpPrereqError("Instance %s still running on the node,"
1211 " please remove first." % instance_name)
1212 if node.name in instance.secondary_nodes:
1213 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1214 " please remove first." % instance_name)
1215 self.op.node_name = node.name
1218 def Exec(self, feedback_fn):
1219 """Removes the node from the cluster.
1223 logger.Info("stopping the node daemon and removing configs from node %s" %
1226 rpc.call_node_leave_cluster(node.name)
1228 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1230 logger.Info("Removing node %s from config" % node.name)
1232 self.cfg.RemoveNode(node.name)
1234 _RemoveHostFromEtcHosts(node.name)
1237 class LUQueryNodes(NoHooksLU):
1238 """Logical unit for querying nodes.
1241 _OP_REQP = ["output_fields", "names"]
1243 def CheckPrereq(self):
1244 """Check prerequisites.
1246 This checks that the fields required are valid output fields.
1249 self.dynamic_fields = frozenset(["dtotal", "dfree",
1250 "mtotal", "mnode", "mfree",
1253 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1254 "pinst_list", "sinst_list",
1256 dynamic=self.dynamic_fields,
1257 selected=self.op.output_fields)
1259 self.wanted = _GetWantedNodes(self, self.op.names)
1261 def Exec(self, feedback_fn):
1262 """Computes the list of nodes and their attributes.
1265 nodenames = self.wanted
1266 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1268 # begin data gathering
1270 if self.dynamic_fields.intersection(self.op.output_fields):
1272 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1273 for name in nodenames:
1274 nodeinfo = node_data.get(name, None)
1277 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1278 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1279 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1280 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1281 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1282 "bootid": nodeinfo['bootid'],
1285 live_data[name] = {}
1287 live_data = dict.fromkeys(nodenames, {})
1289 node_to_primary = dict([(name, set()) for name in nodenames])
1290 node_to_secondary = dict([(name, set()) for name in nodenames])
1292 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1293 "sinst_cnt", "sinst_list"))
1294 if inst_fields & frozenset(self.op.output_fields):
1295 instancelist = self.cfg.GetInstanceList()
1297 for instance_name in instancelist:
1298 inst = self.cfg.GetInstanceInfo(instance_name)
1299 if inst.primary_node in node_to_primary:
1300 node_to_primary[inst.primary_node].add(inst.name)
1301 for secnode in inst.secondary_nodes:
1302 if secnode in node_to_secondary:
1303 node_to_secondary[secnode].add(inst.name)
1305 # end data gathering
1308 for node in nodelist:
1310 for field in self.op.output_fields:
1313 elif field == "pinst_list":
1314 val = list(node_to_primary[node.name])
1315 elif field == "sinst_list":
1316 val = list(node_to_secondary[node.name])
1317 elif field == "pinst_cnt":
1318 val = len(node_to_primary[node.name])
1319 elif field == "sinst_cnt":
1320 val = len(node_to_secondary[node.name])
1321 elif field == "pip":
1322 val = node.primary_ip
1323 elif field == "sip":
1324 val = node.secondary_ip
1325 elif field in self.dynamic_fields:
1326 val = live_data[node.name].get(field, None)
1328 raise errors.ParameterError(field)
1329 node_output.append(val)
1330 output.append(node_output)
1335 class LUQueryNodeVolumes(NoHooksLU):
1336 """Logical unit for getting volumes on node(s).
1339 _OP_REQP = ["nodes", "output_fields"]
1341 def CheckPrereq(self):
1342 """Check prerequisites.
1344 This checks that the fields required are valid output fields.
1347 self.nodes = _GetWantedNodes(self, self.op.nodes)
1349 _CheckOutputFields(static=["node"],
1350 dynamic=["phys", "vg", "name", "size", "instance"],
1351 selected=self.op.output_fields)
1354 def Exec(self, feedback_fn):
1355 """Computes the list of nodes and their attributes.
1358 nodenames = self.nodes
1359 volumes = rpc.call_node_volumes(nodenames)
1361 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1362 in self.cfg.GetInstanceList()]
1364 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1367 for node in nodenames:
1368 if node not in volumes or not volumes[node]:
1371 node_vols = volumes[node][:]
1372 node_vols.sort(key=lambda vol: vol['dev'])
1374 for vol in node_vols:
1376 for field in self.op.output_fields:
1379 elif field == "phys":
1383 elif field == "name":
1385 elif field == "size":
1386 val = int(float(vol['size']))
1387 elif field == "instance":
1389 if node not in lv_by_node[inst]:
1391 if vol['name'] in lv_by_node[inst][node]:
1397 raise errors.ParameterError(field)
1398 node_output.append(str(val))
1400 output.append(node_output)
1405 class LUAddNode(LogicalUnit):
1406 """Logical unit for adding node to the cluster.
1410 HTYPE = constants.HTYPE_NODE
1411 _OP_REQP = ["node_name"]
1413 def BuildHooksEnv(self):
1416 This will run on all nodes before, and on all nodes + the new node after.
1420 "OP_TARGET": self.op.node_name,
1421 "NODE_NAME": self.op.node_name,
1422 "NODE_PIP": self.op.primary_ip,
1423 "NODE_SIP": self.op.secondary_ip,
1425 nodes_0 = self.cfg.GetNodeList()
1426 nodes_1 = nodes_0 + [self.op.node_name, ]
1427 return env, nodes_0, nodes_1
1429 def CheckPrereq(self):
1430 """Check prerequisites.
1433 - the new node is not already in the config
1435 - its parameters (single/dual homed) matches the cluster
1437 Any errors are signalled by raising errors.OpPrereqError.
1440 node_name = self.op.node_name
1443 dns_data = utils.HostInfo(node_name)
1445 node = dns_data.name
1446 primary_ip = self.op.primary_ip = dns_data.ip
1447 secondary_ip = getattr(self.op, "secondary_ip", None)
1448 if secondary_ip is None:
1449 secondary_ip = primary_ip
1450 if not utils.IsValidIP(secondary_ip):
1451 raise errors.OpPrereqError("Invalid secondary IP given")
1452 self.op.secondary_ip = secondary_ip
1453 node_list = cfg.GetNodeList()
1454 if node in node_list:
1455 raise errors.OpPrereqError("Node %s is already in the configuration"
1458 for existing_node_name in node_list:
1459 existing_node = cfg.GetNodeInfo(existing_node_name)
1460 if (existing_node.primary_ip == primary_ip or
1461 existing_node.secondary_ip == primary_ip or
1462 existing_node.primary_ip == secondary_ip or
1463 existing_node.secondary_ip == secondary_ip):
1464 raise errors.OpPrereqError("New node ip address(es) conflict with"
1465 " existing node %s" % existing_node.name)
1467 # check that the type of the node (single versus dual homed) is the
1468 # same as for the master
1469 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1470 master_singlehomed = myself.secondary_ip == myself.primary_ip
1471 newbie_singlehomed = secondary_ip == primary_ip
1472 if master_singlehomed != newbie_singlehomed:
1473 if master_singlehomed:
1474 raise errors.OpPrereqError("The master has no private ip but the"
1475 " new node has one")
1477 raise errors.OpPrereqError("The master has a private ip but the"
1478 " new node doesn't have one")
1480 # checks reachablity
1481 if not utils.TcpPing(utils.HostInfo().name,
1483 constants.DEFAULT_NODED_PORT):
1484 raise errors.OpPrereqError("Node not reachable by ping")
1486 if not newbie_singlehomed:
1487 # check reachability from my secondary ip to newbie's secondary ip
1488 if not utils.TcpPing(myself.secondary_ip,
1490 constants.DEFAULT_NODED_PORT):
1491 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1492 " based ping to noded port")
1494 self.new_node = objects.Node(name=node,
1495 primary_ip=primary_ip,
1496 secondary_ip=secondary_ip)
1498 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1499 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1500 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1501 constants.VNC_PASSWORD_FILE)
1503 def Exec(self, feedback_fn):
1504 """Adds the new node to the cluster.
1507 new_node = self.new_node
1508 node = new_node.name
1510 # set up inter-node password and certificate and restarts the node daemon
1511 gntpass = self.sstore.GetNodeDaemonPassword()
1512 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1513 raise errors.OpExecError("ganeti password corruption detected")
1514 f = open(constants.SSL_CERT_FILE)
1516 gntpem = f.read(8192)
1519 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1520 # so we use this to detect an invalid certificate; as long as the
1521 # cert doesn't contain this, the here-document will be correctly
1522 # parsed by the shell sequence below
1523 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1524 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1525 if not gntpem.endswith("\n"):
1526 raise errors.OpExecError("PEM must end with newline")
1527 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1529 # and then connect with ssh to set password and start ganeti-noded
1530 # note that all the below variables are sanitized at this point,
1531 # either by being constants or by the checks above
1533 mycommand = ("umask 077 && "
1534 "echo '%s' > '%s' && "
1535 "cat > '%s' << '!EOF.' && \n"
1536 "%s!EOF.\n%s restart" %
1537 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1538 constants.SSL_CERT_FILE, gntpem,
1539 constants.NODE_INITD_SCRIPT))
1541 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1543 raise errors.OpExecError("Remote command on node %s, error: %s,"
1545 (node, result.fail_reason, result.output))
1547 # check connectivity
1550 result = rpc.call_version([node])[node]
1552 if constants.PROTOCOL_VERSION == result:
1553 logger.Info("communication to node %s fine, sw version %s match" %
1556 raise errors.OpExecError("Version mismatch master version %s,"
1557 " node version %s" %
1558 (constants.PROTOCOL_VERSION, result))
1560 raise errors.OpExecError("Cannot get version from the new node")
1563 logger.Info("copy ssh key to node %s" % node)
1564 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1566 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1567 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1573 keyarray.append(f.read())
1577 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1578 keyarray[3], keyarray[4], keyarray[5])
1581 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1583 # Add node to our /etc/hosts, and add key to known_hosts
1584 _AddHostToEtcHosts(new_node.name)
1586 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1587 self.cfg.GetHostKey())
1589 if new_node.secondary_ip != new_node.primary_ip:
1590 if not rpc.call_node_tcp_ping(new_node.name,
1591 constants.LOCALHOST_IP_ADDRESS,
1592 new_node.secondary_ip,
1593 constants.DEFAULT_NODED_PORT,
1595 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1596 " you gave (%s). Please fix and re-run this"
1597 " command." % new_node.secondary_ip)
1599 success, msg = ssh.VerifyNodeHostname(node)
1601 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1602 " than the one the resolver gives: %s."
1603 " Please fix and re-run this command." %
1606 # Distribute updated /etc/hosts and known_hosts to all nodes,
1607 # including the node just added
1608 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1609 dist_nodes = self.cfg.GetNodeList() + [node]
1610 if myself.name in dist_nodes:
1611 dist_nodes.remove(myself.name)
1613 logger.Debug("Copying hosts and known_hosts to all nodes")
1614 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1615 result = rpc.call_upload_file(dist_nodes, fname)
1616 for to_node in dist_nodes:
1617 if not result[to_node]:
1618 logger.Error("copy of file %s to node %s failed" %
1621 to_copy = ss.GetFileList()
1622 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1623 to_copy.append(constants.VNC_PASSWORD_FILE)
1624 for fname in to_copy:
1625 if not ssh.CopyFileToNode(node, fname):
1626 logger.Error("could not copy file %s to node %s" % (fname, node))
1628 logger.Info("adding node %s to cluster.conf" % node)
1629 self.cfg.AddNode(new_node)
1632 class LUMasterFailover(LogicalUnit):
1633 """Failover the master node to the current node.
1635 This is a special LU in that it must run on a non-master node.
1638 HPATH = "master-failover"
1639 HTYPE = constants.HTYPE_CLUSTER
1643 def BuildHooksEnv(self):
1646 This will run on the new master only in the pre phase, and on all
1647 the nodes in the post phase.
1651 "OP_TARGET": self.new_master,
1652 "NEW_MASTER": self.new_master,
1653 "OLD_MASTER": self.old_master,
1655 return env, [self.new_master], self.cfg.GetNodeList()
1657 def CheckPrereq(self):
1658 """Check prerequisites.
1660 This checks that we are not already the master.
1663 self.new_master = utils.HostInfo().name
1664 self.old_master = self.sstore.GetMasterNode()
1666 if self.old_master == self.new_master:
1667 raise errors.OpPrereqError("This commands must be run on the node"
1668 " where you want the new master to be."
1669 " %s is already the master" %
1672 def Exec(self, feedback_fn):
1673 """Failover the master node.
1675 This command, when run on a non-master node, will cause the current
1676 master to cease being master, and the non-master to become new
1680 #TODO: do not rely on gethostname returning the FQDN
1681 logger.Info("setting master to %s, old master: %s" %
1682 (self.new_master, self.old_master))
1684 if not rpc.call_node_stop_master(self.old_master):
1685 logger.Error("could disable the master role on the old master"
1686 " %s, please disable manually" % self.old_master)
1689 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1690 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1691 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1692 logger.Error("could not distribute the new simple store master file"
1693 " to the other nodes, please check.")
1695 if not rpc.call_node_start_master(self.new_master):
1696 logger.Error("could not start the master role on the new master"
1697 " %s, please check" % self.new_master)
1698 feedback_fn("Error in activating the master IP on the new master,"
1699 " please fix manually.")
1703 class LUQueryClusterInfo(NoHooksLU):
1704 """Query cluster configuration.
1710 def CheckPrereq(self):
1711 """No prerequsites needed for this LU.
1716 def Exec(self, feedback_fn):
1717 """Return cluster config.
1721 "name": self.sstore.GetClusterName(),
1722 "software_version": constants.RELEASE_VERSION,
1723 "protocol_version": constants.PROTOCOL_VERSION,
1724 "config_version": constants.CONFIG_VERSION,
1725 "os_api_version": constants.OS_API_VERSION,
1726 "export_version": constants.EXPORT_VERSION,
1727 "master": self.sstore.GetMasterNode(),
1728 "architecture": (platform.architecture()[0], platform.machine()),
1734 class LUClusterCopyFile(NoHooksLU):
1735 """Copy file to cluster.
1738 _OP_REQP = ["nodes", "filename"]
1740 def CheckPrereq(self):
1741 """Check prerequisites.
1743 It should check that the named file exists and that the given list
1747 if not os.path.exists(self.op.filename):
1748 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1750 self.nodes = _GetWantedNodes(self, self.op.nodes)
1752 def Exec(self, feedback_fn):
1753 """Copy a file from master to some nodes.
1756 opts - class with options as members
1757 args - list containing a single element, the file name
1759 nodes - list containing the name of target nodes; if empty, all nodes
1762 filename = self.op.filename
1764 myname = utils.HostInfo().name
1766 for node in self.nodes:
1769 if not ssh.CopyFileToNode(node, filename):
1770 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1773 class LUDumpClusterConfig(NoHooksLU):
1774 """Return a text-representation of the cluster-config.
1779 def CheckPrereq(self):
1780 """No prerequisites.
1785 def Exec(self, feedback_fn):
1786 """Dump a representation of the cluster config to the standard output.
1789 return self.cfg.DumpConfig()
1792 class LURunClusterCommand(NoHooksLU):
1793 """Run a command on some nodes.
1796 _OP_REQP = ["command", "nodes"]
1798 def CheckPrereq(self):
1799 """Check prerequisites.
1801 It checks that the given list of nodes is valid.
1804 self.nodes = _GetWantedNodes(self, self.op.nodes)
1806 def Exec(self, feedback_fn):
1807 """Run a command on some nodes.
1811 for node in self.nodes:
1812 result = ssh.SSHCall(node, "root", self.op.command)
1813 data.append((node, result.output, result.exit_code))
1818 class LUActivateInstanceDisks(NoHooksLU):
1819 """Bring up an instance's disks.
1822 _OP_REQP = ["instance_name"]
1824 def CheckPrereq(self):
1825 """Check prerequisites.
1827 This checks that the instance is in the cluster.
1830 instance = self.cfg.GetInstanceInfo(
1831 self.cfg.ExpandInstanceName(self.op.instance_name))
1832 if instance is None:
1833 raise errors.OpPrereqError("Instance '%s' not known" %
1834 self.op.instance_name)
1835 self.instance = instance
1838 def Exec(self, feedback_fn):
1839 """Activate the disks.
1842 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1844 raise errors.OpExecError("Cannot activate block devices")
1849 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1850 """Prepare the block devices for an instance.
1852 This sets up the block devices on all nodes.
1855 instance: a ganeti.objects.Instance object
1856 ignore_secondaries: if true, errors on secondary nodes won't result
1857 in an error return from the function
1860 false if the operation failed
1861 list of (host, instance_visible_name, node_visible_name) if the operation
1862 suceeded with the mapping from node devices to instance devices
1866 iname = instance.name
1867 # With the two passes mechanism we try to reduce the window of
1868 # opportunity for the race condition of switching DRBD to primary
1869 # before handshaking occured, but we do not eliminate it
1871 # The proper fix would be to wait (with some limits) until the
1872 # connection has been made and drbd transitions from WFConnection
1873 # into any other network-connected state (Connected, SyncTarget,
1876 # 1st pass, assemble on all nodes in secondary mode
1877 for inst_disk in instance.disks:
1878 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1879 cfg.SetDiskID(node_disk, node)
1880 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1882 logger.Error("could not prepare block device %s on node %s"
1883 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1884 if not ignore_secondaries:
1887 # FIXME: race condition on drbd migration to primary
1889 # 2nd pass, do only the primary node
1890 for inst_disk in instance.disks:
1891 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1892 if node != instance.primary_node:
1894 cfg.SetDiskID(node_disk, node)
1895 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1897 logger.Error("could not prepare block device %s on node %s"
1898 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1900 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1902 # leave the disks configured for the primary node
1903 # this is a workaround that would be fixed better by
1904 # improving the logical/physical id handling
1905 for disk in instance.disks:
1906 cfg.SetDiskID(disk, instance.primary_node)
1908 return disks_ok, device_info
1911 def _StartInstanceDisks(cfg, instance, force):
1912 """Start the disks of an instance.
1915 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1916 ignore_secondaries=force)
1918 _ShutdownInstanceDisks(instance, cfg)
1919 if force is not None and not force:
1920 logger.Error("If the message above refers to a secondary node,"
1921 " you can retry the operation using '--force'.")
1922 raise errors.OpExecError("Disk consistency error")
1925 class LUDeactivateInstanceDisks(NoHooksLU):
1926 """Shutdown an instance's disks.
1929 _OP_REQP = ["instance_name"]
1931 def CheckPrereq(self):
1932 """Check prerequisites.
1934 This checks that the instance is in the cluster.
1937 instance = self.cfg.GetInstanceInfo(
1938 self.cfg.ExpandInstanceName(self.op.instance_name))
1939 if instance is None:
1940 raise errors.OpPrereqError("Instance '%s' not known" %
1941 self.op.instance_name)
1942 self.instance = instance
1944 def Exec(self, feedback_fn):
1945 """Deactivate the disks
1948 instance = self.instance
1949 ins_l = rpc.call_instance_list([instance.primary_node])
1950 ins_l = ins_l[instance.primary_node]
1951 if not type(ins_l) is list:
1952 raise errors.OpExecError("Can't contact node '%s'" %
1953 instance.primary_node)
1955 if self.instance.name in ins_l:
1956 raise errors.OpExecError("Instance is running, can't shutdown"
1959 _ShutdownInstanceDisks(instance, self.cfg)
1962 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1963 """Shutdown block devices of an instance.
1965 This does the shutdown on all nodes of the instance.
1967 If the ignore_primary is false, errors on the primary node are
1972 for disk in instance.disks:
1973 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1974 cfg.SetDiskID(top_disk, node)
1975 if not rpc.call_blockdev_shutdown(node, top_disk):
1976 logger.Error("could not shutdown block device %s on node %s" %
1977 (disk.iv_name, node))
1978 if not ignore_primary or node != instance.primary_node:
1983 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1984 """Checks if a node has enough free memory.
1986 This function check if a given node has the needed amount of free
1987 memory. In case the node has less memory or we cannot get the
1988 information from the node, this function raise an OpPrereqError
1992 - cfg: a ConfigWriter instance
1993 - node: the node name
1994 - reason: string to use in the error message
1995 - requested: the amount of memory in MiB
1998 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1999 if not nodeinfo or not isinstance(nodeinfo, dict):
2000 raise errors.OpPrereqError("Could not contact node %s for resource"
2001 " information" % (node,))
2003 free_mem = nodeinfo[node].get('memory_free')
2004 if not isinstance(free_mem, int):
2005 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2006 " was '%s'" % (node, free_mem))
2007 if requested > free_mem:
2008 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2009 " needed %s MiB, available %s MiB" %
2010 (node, reason, requested, free_mem))
2013 class LUStartupInstance(LogicalUnit):
2014 """Starts an instance.
2017 HPATH = "instance-start"
2018 HTYPE = constants.HTYPE_INSTANCE
2019 _OP_REQP = ["instance_name", "force"]
2021 def BuildHooksEnv(self):
2024 This runs on master, primary and secondary nodes of the instance.
2028 "FORCE": self.op.force,
2030 env.update(_BuildInstanceHookEnvByObject(self.instance))
2031 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2032 list(self.instance.secondary_nodes))
2035 def CheckPrereq(self):
2036 """Check prerequisites.
2038 This checks that the instance is in the cluster.
2041 instance = self.cfg.GetInstanceInfo(
2042 self.cfg.ExpandInstanceName(self.op.instance_name))
2043 if instance is None:
2044 raise errors.OpPrereqError("Instance '%s' not known" %
2045 self.op.instance_name)
2047 # check bridges existance
2048 _CheckInstanceBridgesExist(instance)
2050 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2051 "starting instance %s" % instance.name,
2054 self.instance = instance
2055 self.op.instance_name = instance.name
2057 def Exec(self, feedback_fn):
2058 """Start the instance.
2061 instance = self.instance
2062 force = self.op.force
2063 extra_args = getattr(self.op, "extra_args", "")
2065 node_current = instance.primary_node
2067 _StartInstanceDisks(self.cfg, instance, force)
2069 if not rpc.call_instance_start(node_current, instance, extra_args):
2070 _ShutdownInstanceDisks(instance, self.cfg)
2071 raise errors.OpExecError("Could not start instance")
2073 self.cfg.MarkInstanceUp(instance.name)
2076 class LURebootInstance(LogicalUnit):
2077 """Reboot an instance.
2080 HPATH = "instance-reboot"
2081 HTYPE = constants.HTYPE_INSTANCE
2082 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2084 def BuildHooksEnv(self):
2087 This runs on master, primary and secondary nodes of the instance.
2091 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2093 env.update(_BuildInstanceHookEnvByObject(self.instance))
2094 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2095 list(self.instance.secondary_nodes))
2098 def CheckPrereq(self):
2099 """Check prerequisites.
2101 This checks that the instance is in the cluster.
2104 instance = self.cfg.GetInstanceInfo(
2105 self.cfg.ExpandInstanceName(self.op.instance_name))
2106 if instance is None:
2107 raise errors.OpPrereqError("Instance '%s' not known" %
2108 self.op.instance_name)
2110 # check bridges existance
2111 _CheckInstanceBridgesExist(instance)
2113 self.instance = instance
2114 self.op.instance_name = instance.name
2116 def Exec(self, feedback_fn):
2117 """Reboot the instance.
2120 instance = self.instance
2121 ignore_secondaries = self.op.ignore_secondaries
2122 reboot_type = self.op.reboot_type
2123 extra_args = getattr(self.op, "extra_args", "")
2125 node_current = instance.primary_node
2127 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2128 constants.INSTANCE_REBOOT_HARD,
2129 constants.INSTANCE_REBOOT_FULL]:
2130 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2131 (constants.INSTANCE_REBOOT_SOFT,
2132 constants.INSTANCE_REBOOT_HARD,
2133 constants.INSTANCE_REBOOT_FULL))
2135 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2136 constants.INSTANCE_REBOOT_HARD]:
2137 if not rpc.call_instance_reboot(node_current, instance,
2138 reboot_type, extra_args):
2139 raise errors.OpExecError("Could not reboot instance")
2141 if not rpc.call_instance_shutdown(node_current, instance):
2142 raise errors.OpExecError("could not shutdown instance for full reboot")
2143 _ShutdownInstanceDisks(instance, self.cfg)
2144 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2145 if not rpc.call_instance_start(node_current, instance, extra_args):
2146 _ShutdownInstanceDisks(instance, self.cfg)
2147 raise errors.OpExecError("Could not start instance for full reboot")
2149 self.cfg.MarkInstanceUp(instance.name)
2152 class LUShutdownInstance(LogicalUnit):
2153 """Shutdown an instance.
2156 HPATH = "instance-stop"
2157 HTYPE = constants.HTYPE_INSTANCE
2158 _OP_REQP = ["instance_name"]
2160 def BuildHooksEnv(self):
2163 This runs on master, primary and secondary nodes of the instance.
2166 env = _BuildInstanceHookEnvByObject(self.instance)
2167 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2168 list(self.instance.secondary_nodes))
2171 def CheckPrereq(self):
2172 """Check prerequisites.
2174 This checks that the instance is in the cluster.
2177 instance = self.cfg.GetInstanceInfo(
2178 self.cfg.ExpandInstanceName(self.op.instance_name))
2179 if instance is None:
2180 raise errors.OpPrereqError("Instance '%s' not known" %
2181 self.op.instance_name)
2182 self.instance = instance
2184 def Exec(self, feedback_fn):
2185 """Shutdown the instance.
2188 instance = self.instance
2189 node_current = instance.primary_node
2190 if not rpc.call_instance_shutdown(node_current, instance):
2191 logger.Error("could not shutdown instance")
2193 self.cfg.MarkInstanceDown(instance.name)
2194 _ShutdownInstanceDisks(instance, self.cfg)
2197 class LUReinstallInstance(LogicalUnit):
2198 """Reinstall an instance.
2201 HPATH = "instance-reinstall"
2202 HTYPE = constants.HTYPE_INSTANCE
2203 _OP_REQP = ["instance_name"]
2205 def BuildHooksEnv(self):
2208 This runs on master, primary and secondary nodes of the instance.
2211 env = _BuildInstanceHookEnvByObject(self.instance)
2212 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2213 list(self.instance.secondary_nodes))
2216 def CheckPrereq(self):
2217 """Check prerequisites.
2219 This checks that the instance is in the cluster and is not running.
2222 instance = self.cfg.GetInstanceInfo(
2223 self.cfg.ExpandInstanceName(self.op.instance_name))
2224 if instance is None:
2225 raise errors.OpPrereqError("Instance '%s' not known" %
2226 self.op.instance_name)
2227 if instance.disk_template == constants.DT_DISKLESS:
2228 raise errors.OpPrereqError("Instance '%s' has no disks" %
2229 self.op.instance_name)
2230 if instance.status != "down":
2231 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2232 self.op.instance_name)
2233 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2235 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2236 (self.op.instance_name,
2237 instance.primary_node))
2239 self.op.os_type = getattr(self.op, "os_type", None)
2240 if self.op.os_type is not None:
2242 pnode = self.cfg.GetNodeInfo(
2243 self.cfg.ExpandNodeName(instance.primary_node))
2245 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2247 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2249 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2250 " primary node" % self.op.os_type)
2252 self.instance = instance
2254 def Exec(self, feedback_fn):
2255 """Reinstall the instance.
2258 inst = self.instance
2260 if self.op.os_type is not None:
2261 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2262 inst.os = self.op.os_type
2263 self.cfg.AddInstance(inst)
2265 _StartInstanceDisks(self.cfg, inst, None)
2267 feedback_fn("Running the instance OS create scripts...")
2268 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2269 raise errors.OpExecError("Could not install OS for instance %s"
2271 (inst.name, inst.primary_node))
2273 _ShutdownInstanceDisks(inst, self.cfg)
2276 class LURenameInstance(LogicalUnit):
2277 """Rename an instance.
2280 HPATH = "instance-rename"
2281 HTYPE = constants.HTYPE_INSTANCE
2282 _OP_REQP = ["instance_name", "new_name"]
2284 def BuildHooksEnv(self):
2287 This runs on master, primary and secondary nodes of the instance.
2290 env = _BuildInstanceHookEnvByObject(self.instance)
2291 env["INSTANCE_NEW_NAME"] = self.op.new_name
2292 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2293 list(self.instance.secondary_nodes))
2296 def CheckPrereq(self):
2297 """Check prerequisites.
2299 This checks that the instance is in the cluster and is not running.
2302 instance = self.cfg.GetInstanceInfo(
2303 self.cfg.ExpandInstanceName(self.op.instance_name))
2304 if instance is None:
2305 raise errors.OpPrereqError("Instance '%s' not known" %
2306 self.op.instance_name)
2307 if instance.status != "down":
2308 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2309 self.op.instance_name)
2310 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2312 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2313 (self.op.instance_name,
2314 instance.primary_node))
2315 self.instance = instance
2317 # new name verification
2318 name_info = utils.HostInfo(self.op.new_name)
2320 self.op.new_name = new_name = name_info.name
2321 instance_list = self.cfg.GetInstanceList()
2322 if new_name in instance_list:
2323 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2326 if not getattr(self.op, "ignore_ip", False):
2327 command = ["fping", "-q", name_info.ip]
2328 result = utils.RunCmd(command)
2329 if not result.failed:
2330 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2331 (name_info.ip, new_name))
2334 def Exec(self, feedback_fn):
2335 """Reinstall the instance.
2338 inst = self.instance
2339 old_name = inst.name
2341 self.cfg.RenameInstance(inst.name, self.op.new_name)
2343 # re-read the instance from the configuration after rename
2344 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2346 _StartInstanceDisks(self.cfg, inst, None)
2348 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2350 msg = ("Could run OS rename script for instance %s on node %s (but the"
2351 " instance has been renamed in Ganeti)" %
2352 (inst.name, inst.primary_node))
2355 _ShutdownInstanceDisks(inst, self.cfg)
2358 class LURemoveInstance(LogicalUnit):
2359 """Remove an instance.
2362 HPATH = "instance-remove"
2363 HTYPE = constants.HTYPE_INSTANCE
2364 _OP_REQP = ["instance_name"]
2366 def BuildHooksEnv(self):
2369 This runs on master, primary and secondary nodes of the instance.
2372 env = _BuildInstanceHookEnvByObject(self.instance)
2373 nl = [self.sstore.GetMasterNode()]
2376 def CheckPrereq(self):
2377 """Check prerequisites.
2379 This checks that the instance is in the cluster.
2382 instance = self.cfg.GetInstanceInfo(
2383 self.cfg.ExpandInstanceName(self.op.instance_name))
2384 if instance is None:
2385 raise errors.OpPrereqError("Instance '%s' not known" %
2386 self.op.instance_name)
2387 self.instance = instance
2389 def Exec(self, feedback_fn):
2390 """Remove the instance.
2393 instance = self.instance
2394 logger.Info("shutting down instance %s on node %s" %
2395 (instance.name, instance.primary_node))
2397 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2398 if self.op.ignore_failures:
2399 feedback_fn("Warning: can't shutdown instance")
2401 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2402 (instance.name, instance.primary_node))
2404 logger.Info("removing block devices for instance %s" % instance.name)
2406 if not _RemoveDisks(instance, self.cfg):
2407 if self.op.ignore_failures:
2408 feedback_fn("Warning: can't remove instance's disks")
2410 raise errors.OpExecError("Can't remove instance's disks")
2412 logger.Info("removing instance %s out of cluster config" % instance.name)
2414 self.cfg.RemoveInstance(instance.name)
2417 class LUQueryInstances(NoHooksLU):
2418 """Logical unit for querying instances.
2421 _OP_REQP = ["output_fields", "names"]
2423 def CheckPrereq(self):
2424 """Check prerequisites.
2426 This checks that the fields required are valid output fields.
2429 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2430 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2431 "admin_state", "admin_ram",
2432 "disk_template", "ip", "mac", "bridge",
2433 "sda_size", "sdb_size", "vcpus"],
2434 dynamic=self.dynamic_fields,
2435 selected=self.op.output_fields)
2437 self.wanted = _GetWantedInstances(self, self.op.names)
2439 def Exec(self, feedback_fn):
2440 """Computes the list of nodes and their attributes.
2443 instance_names = self.wanted
2444 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2447 # begin data gathering
2449 nodes = frozenset([inst.primary_node for inst in instance_list])
2452 if self.dynamic_fields.intersection(self.op.output_fields):
2454 node_data = rpc.call_all_instances_info(nodes)
2456 result = node_data[name]
2458 live_data.update(result)
2459 elif result == False:
2460 bad_nodes.append(name)
2461 # else no instance is alive
2463 live_data = dict([(name, {}) for name in instance_names])
2465 # end data gathering
2468 for instance in instance_list:
2470 for field in self.op.output_fields:
2475 elif field == "pnode":
2476 val = instance.primary_node
2477 elif field == "snodes":
2478 val = list(instance.secondary_nodes)
2479 elif field == "admin_state":
2480 val = (instance.status != "down")
2481 elif field == "oper_state":
2482 if instance.primary_node in bad_nodes:
2485 val = bool(live_data.get(instance.name))
2486 elif field == "status":
2487 if instance.primary_node in bad_nodes:
2488 val = "ERROR_nodedown"
2490 running = bool(live_data.get(instance.name))
2492 if instance.status != "down":
2497 if instance.status != "down":
2501 elif field == "admin_ram":
2502 val = instance.memory
2503 elif field == "oper_ram":
2504 if instance.primary_node in bad_nodes:
2506 elif instance.name in live_data:
2507 val = live_data[instance.name].get("memory", "?")
2510 elif field == "disk_template":
2511 val = instance.disk_template
2513 val = instance.nics[0].ip
2514 elif field == "bridge":
2515 val = instance.nics[0].bridge
2516 elif field == "mac":
2517 val = instance.nics[0].mac
2518 elif field == "sda_size" or field == "sdb_size":
2519 disk = instance.FindDisk(field[:3])
2524 elif field == "vcpus":
2525 val = instance.vcpus
2527 raise errors.ParameterError(field)
2534 class LUFailoverInstance(LogicalUnit):
2535 """Failover an instance.
2538 HPATH = "instance-failover"
2539 HTYPE = constants.HTYPE_INSTANCE
2540 _OP_REQP = ["instance_name", "ignore_consistency"]
2542 def BuildHooksEnv(self):
2545 This runs on master, primary and secondary nodes of the instance.
2549 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2551 env.update(_BuildInstanceHookEnvByObject(self.instance))
2552 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2555 def CheckPrereq(self):
2556 """Check prerequisites.
2558 This checks that the instance is in the cluster.
2561 instance = self.cfg.GetInstanceInfo(
2562 self.cfg.ExpandInstanceName(self.op.instance_name))
2563 if instance is None:
2564 raise errors.OpPrereqError("Instance '%s' not known" %
2565 self.op.instance_name)
2567 if instance.disk_template not in constants.DTS_NET_MIRROR:
2568 raise errors.OpPrereqError("Instance's disk layout is not"
2569 " network mirrored, cannot failover.")
2571 secondary_nodes = instance.secondary_nodes
2572 if not secondary_nodes:
2573 raise errors.ProgrammerError("no secondary node but using "
2574 "DT_REMOTE_RAID1 template")
2576 target_node = secondary_nodes[0]
2577 # check memory requirements on the secondary node
2578 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2579 instance.name, instance.memory)
2581 # check bridge existance
2582 brlist = [nic.bridge for nic in instance.nics]
2583 if not rpc.call_bridges_exist(target_node, brlist):
2584 raise errors.OpPrereqError("One or more target bridges %s does not"
2585 " exist on destination node '%s'" %
2586 (brlist, target_node))
2588 self.instance = instance
2590 def Exec(self, feedback_fn):
2591 """Failover an instance.
2593 The failover is done by shutting it down on its present node and
2594 starting it on the secondary.
2597 instance = self.instance
2599 source_node = instance.primary_node
2600 target_node = instance.secondary_nodes[0]
2602 feedback_fn("* checking disk consistency between source and target")
2603 for dev in instance.disks:
2604 # for remote_raid1, these are md over drbd
2605 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2606 if not self.op.ignore_consistency:
2607 raise errors.OpExecError("Disk %s is degraded on target node,"
2608 " aborting failover." % dev.iv_name)
2610 feedback_fn("* shutting down instance on source node")
2611 logger.Info("Shutting down instance %s on node %s" %
2612 (instance.name, source_node))
2614 if not rpc.call_instance_shutdown(source_node, instance):
2615 if self.op.ignore_consistency:
2616 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2617 " anyway. Please make sure node %s is down" %
2618 (instance.name, source_node, source_node))
2620 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2621 (instance.name, source_node))
2623 feedback_fn("* deactivating the instance's disks on source node")
2624 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2625 raise errors.OpExecError("Can't shut down the instance's disks.")
2627 instance.primary_node = target_node
2628 # distribute new instance config to the other nodes
2629 self.cfg.AddInstance(instance)
2631 feedback_fn("* activating the instance's disks on target node")
2632 logger.Info("Starting instance %s on node %s" %
2633 (instance.name, target_node))
2635 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2636 ignore_secondaries=True)
2638 _ShutdownInstanceDisks(instance, self.cfg)
2639 raise errors.OpExecError("Can't activate the instance's disks")
2641 feedback_fn("* starting the instance on the target node")
2642 if not rpc.call_instance_start(target_node, instance, None):
2643 _ShutdownInstanceDisks(instance, self.cfg)
2644 raise errors.OpExecError("Could not start instance %s on node %s." %
2645 (instance.name, target_node))
2648 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2649 """Create a tree of block devices on the primary node.
2651 This always creates all devices.
2655 for child in device.children:
2656 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2659 cfg.SetDiskID(device, node)
2660 new_id = rpc.call_blockdev_create(node, device, device.size,
2661 instance.name, True, info)
2664 if device.physical_id is None:
2665 device.physical_id = new_id
2669 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2670 """Create a tree of block devices on a secondary node.
2672 If this device type has to be created on secondaries, create it and
2675 If not, just recurse to children keeping the same 'force' value.
2678 if device.CreateOnSecondary():
2681 for child in device.children:
2682 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2683 child, force, info):
2688 cfg.SetDiskID(device, node)
2689 new_id = rpc.call_blockdev_create(node, device, device.size,
2690 instance.name, False, info)
2693 if device.physical_id is None:
2694 device.physical_id = new_id
2698 def _GenerateUniqueNames(cfg, exts):
2699 """Generate a suitable LV name.
2701 This will generate a logical volume name for the given instance.
2706 new_id = cfg.GenerateUniqueID()
2707 results.append("%s%s" % (new_id, val))
2711 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2712 """Generate a drbd device complete with its children.
2715 port = cfg.AllocatePort()
2716 vgname = cfg.GetVGName()
2717 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2718 logical_id=(vgname, names[0]))
2719 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2720 logical_id=(vgname, names[1]))
2721 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2722 logical_id = (primary, secondary, port),
2723 children = [dev_data, dev_meta])
2727 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2728 """Generate a drbd8 device complete with its children.
2731 port = cfg.AllocatePort()
2732 vgname = cfg.GetVGName()
2733 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2734 logical_id=(vgname, names[0]))
2735 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2736 logical_id=(vgname, names[1]))
2737 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2738 logical_id = (primary, secondary, port),
2739 children = [dev_data, dev_meta],
2744 def _GenerateDiskTemplate(cfg, template_name,
2745 instance_name, primary_node,
2746 secondary_nodes, disk_sz, swap_sz):
2747 """Generate the entire disk layout for a given template type.
2750 #TODO: compute space requirements
2752 vgname = cfg.GetVGName()
2753 if template_name == constants.DT_DISKLESS:
2755 elif template_name == constants.DT_PLAIN:
2756 if len(secondary_nodes) != 0:
2757 raise errors.ProgrammerError("Wrong template configuration")
2759 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2760 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2761 logical_id=(vgname, names[0]),
2763 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2764 logical_id=(vgname, names[1]),
2766 disks = [sda_dev, sdb_dev]
2767 elif template_name == constants.DT_LOCAL_RAID1:
2768 if len(secondary_nodes) != 0:
2769 raise errors.ProgrammerError("Wrong template configuration")
2772 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2773 ".sdb_m1", ".sdb_m2"])
2774 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2775 logical_id=(vgname, names[0]))
2776 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2777 logical_id=(vgname, names[1]))
2778 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2780 children = [sda_dev_m1, sda_dev_m2])
2781 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2782 logical_id=(vgname, names[2]))
2783 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2784 logical_id=(vgname, names[3]))
2785 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2787 children = [sdb_dev_m1, sdb_dev_m2])
2788 disks = [md_sda_dev, md_sdb_dev]
2789 elif template_name == constants.DT_REMOTE_RAID1:
2790 if len(secondary_nodes) != 1:
2791 raise errors.ProgrammerError("Wrong template configuration")
2792 remote_node = secondary_nodes[0]
2793 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2794 ".sdb_data", ".sdb_meta"])
2795 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2796 disk_sz, names[0:2])
2797 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2798 children = [drbd_sda_dev], size=disk_sz)
2799 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2800 swap_sz, names[2:4])
2801 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2802 children = [drbd_sdb_dev], size=swap_sz)
2803 disks = [md_sda_dev, md_sdb_dev]
2804 elif template_name == constants.DT_DRBD8:
2805 if len(secondary_nodes) != 1:
2806 raise errors.ProgrammerError("Wrong template configuration")
2807 remote_node = secondary_nodes[0]
2808 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2809 ".sdb_data", ".sdb_meta"])
2810 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2811 disk_sz, names[0:2], "sda")
2812 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2813 swap_sz, names[2:4], "sdb")
2814 disks = [drbd_sda_dev, drbd_sdb_dev]
2816 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2820 def _GetInstanceInfoText(instance):
2821 """Compute that text that should be added to the disk's metadata.
2824 return "originstname+%s" % instance.name
2827 def _CreateDisks(cfg, instance):
2828 """Create all disks for an instance.
2830 This abstracts away some work from AddInstance.
2833 instance: the instance object
2836 True or False showing the success of the creation process
2839 info = _GetInstanceInfoText(instance)
2841 for device in instance.disks:
2842 logger.Info("creating volume %s for instance %s" %
2843 (device.iv_name, instance.name))
2845 for secondary_node in instance.secondary_nodes:
2846 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2847 device, False, info):
2848 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2849 (device.iv_name, device, secondary_node))
2852 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2853 instance, device, info):
2854 logger.Error("failed to create volume %s on primary!" %
2860 def _RemoveDisks(instance, cfg):
2861 """Remove all disks for an instance.
2863 This abstracts away some work from `AddInstance()` and
2864 `RemoveInstance()`. Note that in case some of the devices couldn't
2865 be removed, the removal will continue with the other ones (compare
2866 with `_CreateDisks()`).
2869 instance: the instance object
2872 True or False showing the success of the removal proces
2875 logger.Info("removing block devices for instance %s" % instance.name)
2878 for device in instance.disks:
2879 for node, disk in device.ComputeNodeTree(instance.primary_node):
2880 cfg.SetDiskID(disk, node)
2881 if not rpc.call_blockdev_remove(node, disk):
2882 logger.Error("could not remove block device %s on node %s,"
2883 " continuing anyway" %
2884 (device.iv_name, node))
2889 class LUCreateInstance(LogicalUnit):
2890 """Create an instance.
2893 HPATH = "instance-add"
2894 HTYPE = constants.HTYPE_INSTANCE
2895 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2896 "disk_template", "swap_size", "mode", "start", "vcpus",
2897 "wait_for_sync", "ip_check", "mac"]
2899 def BuildHooksEnv(self):
2902 This runs on master, primary and secondary nodes of the instance.
2906 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2907 "INSTANCE_DISK_SIZE": self.op.disk_size,
2908 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2909 "INSTANCE_ADD_MODE": self.op.mode,
2911 if self.op.mode == constants.INSTANCE_IMPORT:
2912 env["INSTANCE_SRC_NODE"] = self.op.src_node
2913 env["INSTANCE_SRC_PATH"] = self.op.src_path
2914 env["INSTANCE_SRC_IMAGE"] = self.src_image
2916 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2917 primary_node=self.op.pnode,
2918 secondary_nodes=self.secondaries,
2919 status=self.instance_status,
2920 os_type=self.op.os_type,
2921 memory=self.op.mem_size,
2922 vcpus=self.op.vcpus,
2923 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2926 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2931 def CheckPrereq(self):
2932 """Check prerequisites.
2935 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2936 if not hasattr(self.op, attr):
2937 setattr(self.op, attr, None)
2939 if self.op.mode not in (constants.INSTANCE_CREATE,
2940 constants.INSTANCE_IMPORT):
2941 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2944 if self.op.mode == constants.INSTANCE_IMPORT:
2945 src_node = getattr(self.op, "src_node", None)
2946 src_path = getattr(self.op, "src_path", None)
2947 if src_node is None or src_path is None:
2948 raise errors.OpPrereqError("Importing an instance requires source"
2949 " node and path options")
2950 src_node_full = self.cfg.ExpandNodeName(src_node)
2951 if src_node_full is None:
2952 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2953 self.op.src_node = src_node = src_node_full
2955 if not os.path.isabs(src_path):
2956 raise errors.OpPrereqError("The source path must be absolute")
2958 export_info = rpc.call_export_info(src_node, src_path)
2961 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2963 if not export_info.has_section(constants.INISECT_EXP):
2964 raise errors.ProgrammerError("Corrupted export config")
2966 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2967 if (int(ei_version) != constants.EXPORT_VERSION):
2968 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2969 (ei_version, constants.EXPORT_VERSION))
2971 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2972 raise errors.OpPrereqError("Can't import instance with more than"
2975 # FIXME: are the old os-es, disk sizes, etc. useful?
2976 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2977 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2979 self.src_image = diskimage
2980 else: # INSTANCE_CREATE
2981 if getattr(self.op, "os_type", None) is None:
2982 raise errors.OpPrereqError("No guest OS specified")
2984 # check primary node
2985 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2987 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2989 self.op.pnode = pnode.name
2991 self.secondaries = []
2992 # disk template and mirror node verification
2993 if self.op.disk_template not in constants.DISK_TEMPLATES:
2994 raise errors.OpPrereqError("Invalid disk template name")
2996 if self.op.disk_template in constants.DTS_NET_MIRROR:
2997 if getattr(self.op, "snode", None) is None:
2998 raise errors.OpPrereqError("The networked disk templates need"
3001 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3002 if snode_name is None:
3003 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3005 elif snode_name == pnode.name:
3006 raise errors.OpPrereqError("The secondary node cannot be"
3007 " the primary node.")
3008 self.secondaries.append(snode_name)
3010 # Required free disk space as a function of disk and swap space
3012 constants.DT_DISKLESS: None,
3013 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3014 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3015 # 256 MB are added for drbd metadata, 128MB for each drbd device
3016 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3017 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3020 if self.op.disk_template not in req_size_dict:
3021 raise errors.ProgrammerError("Disk template '%s' size requirement"
3022 " is unknown" % self.op.disk_template)
3024 req_size = req_size_dict[self.op.disk_template]
3026 # Check lv size requirements
3027 if req_size is not None:
3028 nodenames = [pnode.name] + self.secondaries
3029 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3030 for node in nodenames:
3031 info = nodeinfo.get(node, None)
3033 raise errors.OpPrereqError("Cannot get current information"
3034 " from node '%s'" % nodeinfo)
3035 vg_free = info.get('vg_free', None)
3036 if not isinstance(vg_free, int):
3037 raise errors.OpPrereqError("Can't compute free disk space on"
3039 if req_size > info['vg_free']:
3040 raise errors.OpPrereqError("Not enough disk space on target node %s."
3041 " %d MB available, %d MB required" %
3042 (node, info['vg_free'], req_size))
3045 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3047 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3048 " primary node" % self.op.os_type)
3050 if self.op.kernel_path == constants.VALUE_NONE:
3051 raise errors.OpPrereqError("Can't set instance kernel to none")
3053 # instance verification
3054 hostname1 = utils.HostInfo(self.op.instance_name)
3056 self.op.instance_name = instance_name = hostname1.name
3057 instance_list = self.cfg.GetInstanceList()
3058 if instance_name in instance_list:
3059 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3062 ip = getattr(self.op, "ip", None)
3063 if ip is None or ip.lower() == "none":
3065 elif ip.lower() == "auto":
3066 inst_ip = hostname1.ip
3068 if not utils.IsValidIP(ip):
3069 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3070 " like a valid IP" % ip)
3072 self.inst_ip = inst_ip
3074 if self.op.start and not self.op.ip_check:
3075 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3076 " adding an instance in start mode")
3078 if self.op.ip_check:
3079 if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3080 constants.DEFAULT_NODED_PORT):
3081 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3082 (hostname1.ip, instance_name))
3084 # MAC address verification
3085 if self.op.mac != "auto":
3086 if not utils.IsValidMac(self.op.mac.lower()):
3087 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3090 # bridge verification
3091 bridge = getattr(self.op, "bridge", None)
3093 self.op.bridge = self.cfg.GetDefBridge()
3095 self.op.bridge = bridge
3097 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3098 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3099 " destination node '%s'" %
3100 (self.op.bridge, pnode.name))
3102 # boot order verification
3103 if self.op.hvm_boot_order is not None:
3104 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3105 raise errors.OpPrereqError("invalid boot order specified,"
3106 " must be one or more of [acdn]")
3109 self.instance_status = 'up'
3111 self.instance_status = 'down'
3113 def Exec(self, feedback_fn):
3114 """Create and add the instance to the cluster.
3117 instance = self.op.instance_name
3118 pnode_name = self.pnode.name
3120 if self.op.mac == "auto":
3121 mac_address = self.cfg.GenerateMAC()
3123 mac_address = self.op.mac
3125 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3126 if self.inst_ip is not None:
3127 nic.ip = self.inst_ip
3129 ht_kind = self.sstore.GetHypervisorType()
3130 if ht_kind in constants.HTS_REQ_PORT:
3131 network_port = self.cfg.AllocatePort()
3135 disks = _GenerateDiskTemplate(self.cfg,
3136 self.op.disk_template,
3137 instance, pnode_name,
3138 self.secondaries, self.op.disk_size,
3141 iobj = objects.Instance(name=instance, os=self.op.os_type,
3142 primary_node=pnode_name,
3143 memory=self.op.mem_size,
3144 vcpus=self.op.vcpus,
3145 nics=[nic], disks=disks,
3146 disk_template=self.op.disk_template,
3147 status=self.instance_status,
3148 network_port=network_port,
3149 kernel_path=self.op.kernel_path,
3150 initrd_path=self.op.initrd_path,
3151 hvm_boot_order=self.op.hvm_boot_order,
3154 feedback_fn("* creating instance disks...")
3155 if not _CreateDisks(self.cfg, iobj):
3156 _RemoveDisks(iobj, self.cfg)
3157 raise errors.OpExecError("Device creation failed, reverting...")
3159 feedback_fn("adding instance %s to cluster config" % instance)
3161 self.cfg.AddInstance(iobj)
3163 if self.op.wait_for_sync:
3164 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3165 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3166 # make sure the disks are not degraded (still sync-ing is ok)
3168 feedback_fn("* checking mirrors status")
3169 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3174 _RemoveDisks(iobj, self.cfg)
3175 self.cfg.RemoveInstance(iobj.name)
3176 raise errors.OpExecError("There are some degraded disks for"
3179 feedback_fn("creating os for instance %s on node %s" %
3180 (instance, pnode_name))
3182 if iobj.disk_template != constants.DT_DISKLESS:
3183 if self.op.mode == constants.INSTANCE_CREATE:
3184 feedback_fn("* running the instance OS create scripts...")
3185 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3186 raise errors.OpExecError("could not add os for instance %s"
3188 (instance, pnode_name))
3190 elif self.op.mode == constants.INSTANCE_IMPORT:
3191 feedback_fn("* running the instance OS import scripts...")
3192 src_node = self.op.src_node
3193 src_image = self.src_image
3194 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3195 src_node, src_image):
3196 raise errors.OpExecError("Could not import os for instance"
3198 (instance, pnode_name))
3200 # also checked in the prereq part
3201 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3205 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3206 feedback_fn("* starting instance...")
3207 if not rpc.call_instance_start(pnode_name, iobj, None):
3208 raise errors.OpExecError("Could not start instance")
3211 class LUConnectConsole(NoHooksLU):
3212 """Connect to an instance's console.
3214 This is somewhat special in that it returns the command line that
3215 you need to run on the master node in order to connect to the
3219 _OP_REQP = ["instance_name"]
3221 def CheckPrereq(self):
3222 """Check prerequisites.
3224 This checks that the instance is in the cluster.
3227 instance = self.cfg.GetInstanceInfo(
3228 self.cfg.ExpandInstanceName(self.op.instance_name))
3229 if instance is None:
3230 raise errors.OpPrereqError("Instance '%s' not known" %
3231 self.op.instance_name)
3232 self.instance = instance
3234 def Exec(self, feedback_fn):
3235 """Connect to the console of an instance
3238 instance = self.instance
3239 node = instance.primary_node
3241 node_insts = rpc.call_instance_list([node])[node]
3242 if node_insts is False:
3243 raise errors.OpExecError("Can't connect to node %s." % node)
3245 if instance.name not in node_insts:
3246 raise errors.OpExecError("Instance %s is not running." % instance.name)
3248 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3250 hyper = hypervisor.GetHypervisor()
3251 console_cmd = hyper.GetShellCommandForConsole(instance)
3253 argv = ["ssh", "-q", "-t"]
3254 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3255 argv.extend(ssh.BATCH_MODE_OPTS)
3257 argv.append(console_cmd)
3261 class LUAddMDDRBDComponent(LogicalUnit):
3262 """Adda new mirror member to an instance's disk.
3265 HPATH = "mirror-add"
3266 HTYPE = constants.HTYPE_INSTANCE
3267 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3269 def BuildHooksEnv(self):
3272 This runs on the master, the primary and all the secondaries.
3276 "NEW_SECONDARY": self.op.remote_node,
3277 "DISK_NAME": self.op.disk_name,
3279 env.update(_BuildInstanceHookEnvByObject(self.instance))
3280 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3281 self.op.remote_node,] + list(self.instance.secondary_nodes)
3284 def CheckPrereq(self):
3285 """Check prerequisites.
3287 This checks that the instance is in the cluster.
3290 instance = self.cfg.GetInstanceInfo(
3291 self.cfg.ExpandInstanceName(self.op.instance_name))
3292 if instance is None:
3293 raise errors.OpPrereqError("Instance '%s' not known" %
3294 self.op.instance_name)
3295 self.instance = instance
3297 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3298 if remote_node is None:
3299 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3300 self.remote_node = remote_node
3302 if remote_node == instance.primary_node:
3303 raise errors.OpPrereqError("The specified node is the primary node of"
3306 if instance.disk_template != constants.DT_REMOTE_RAID1:
3307 raise errors.OpPrereqError("Instance's disk layout is not"
3309 for disk in instance.disks:
3310 if disk.iv_name == self.op.disk_name:
3313 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3314 " instance." % self.op.disk_name)
3315 if len(disk.children) > 1:
3316 raise errors.OpPrereqError("The device already has two slave devices."
3317 " This would create a 3-disk raid1 which we"
3321 def Exec(self, feedback_fn):
3322 """Add the mirror component
3326 instance = self.instance
3328 remote_node = self.remote_node
3329 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3330 names = _GenerateUniqueNames(self.cfg, lv_names)
3331 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3332 remote_node, disk.size, names)
3334 logger.Info("adding new mirror component on secondary")
3336 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3338 _GetInstanceInfoText(instance)):
3339 raise errors.OpExecError("Failed to create new component on secondary"
3340 " node %s" % remote_node)
3342 logger.Info("adding new mirror component on primary")
3344 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3346 _GetInstanceInfoText(instance)):
3347 # remove secondary dev
3348 self.cfg.SetDiskID(new_drbd, remote_node)
3349 rpc.call_blockdev_remove(remote_node, new_drbd)
3350 raise errors.OpExecError("Failed to create volume on primary")
3352 # the device exists now
3353 # call the primary node to add the mirror to md
3354 logger.Info("adding new mirror component to md")
3355 if not rpc.call_blockdev_addchildren(instance.primary_node,
3357 logger.Error("Can't add mirror compoment to md!")
3358 self.cfg.SetDiskID(new_drbd, remote_node)
3359 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3360 logger.Error("Can't rollback on secondary")
3361 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3362 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3363 logger.Error("Can't rollback on primary")
3364 raise errors.OpExecError("Can't add mirror component to md array")
3366 disk.children.append(new_drbd)
3368 self.cfg.AddInstance(instance)
3370 _WaitForSync(self.cfg, instance, self.proc)
3375 class LURemoveMDDRBDComponent(LogicalUnit):
3376 """Remove a component from a remote_raid1 disk.
3379 HPATH = "mirror-remove"
3380 HTYPE = constants.HTYPE_INSTANCE
3381 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3383 def BuildHooksEnv(self):
3386 This runs on the master, the primary and all the secondaries.
3390 "DISK_NAME": self.op.disk_name,
3391 "DISK_ID": self.op.disk_id,
3392 "OLD_SECONDARY": self.old_secondary,
3394 env.update(_BuildInstanceHookEnvByObject(self.instance))
3395 nl = [self.sstore.GetMasterNode(),
3396 self.instance.primary_node] + list(self.instance.secondary_nodes)
3399 def CheckPrereq(self):
3400 """Check prerequisites.
3402 This checks that the instance is in the cluster.
3405 instance = self.cfg.GetInstanceInfo(
3406 self.cfg.ExpandInstanceName(self.op.instance_name))
3407 if instance is None:
3408 raise errors.OpPrereqError("Instance '%s' not known" %
3409 self.op.instance_name)
3410 self.instance = instance
3412 if instance.disk_template != constants.DT_REMOTE_RAID1:
3413 raise errors.OpPrereqError("Instance's disk layout is not"
3415 for disk in instance.disks:
3416 if disk.iv_name == self.op.disk_name:
3419 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3420 " instance." % self.op.disk_name)
3421 for child in disk.children:
3422 if (child.dev_type == constants.LD_DRBD7 and
3423 child.logical_id[2] == self.op.disk_id):
3426 raise errors.OpPrereqError("Can't find the device with this port.")
3428 if len(disk.children) < 2:
3429 raise errors.OpPrereqError("Cannot remove the last component from"
3433 if self.child.logical_id[0] == instance.primary_node:
3437 self.old_secondary = self.child.logical_id[oid]
3439 def Exec(self, feedback_fn):
3440 """Remove the mirror component
3443 instance = self.instance
3446 logger.Info("remove mirror component")
3447 self.cfg.SetDiskID(disk, instance.primary_node)
3448 if not rpc.call_blockdev_removechildren(instance.primary_node,
3450 raise errors.OpExecError("Can't remove child from mirror.")
3452 for node in child.logical_id[:2]:
3453 self.cfg.SetDiskID(child, node)
3454 if not rpc.call_blockdev_remove(node, child):
3455 logger.Error("Warning: failed to remove device from node %s,"
3456 " continuing operation." % node)
3458 disk.children.remove(child)
3459 self.cfg.AddInstance(instance)
3462 class LUReplaceDisks(LogicalUnit):
3463 """Replace the disks of an instance.
3466 HPATH = "mirrors-replace"
3467 HTYPE = constants.HTYPE_INSTANCE
3468 _OP_REQP = ["instance_name", "mode", "disks"]
3470 def BuildHooksEnv(self):
3473 This runs on the master, the primary and all the secondaries.
3477 "MODE": self.op.mode,
3478 "NEW_SECONDARY": self.op.remote_node,
3479 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3481 env.update(_BuildInstanceHookEnvByObject(self.instance))
3483 self.sstore.GetMasterNode(),
3484 self.instance.primary_node,
3486 if self.op.remote_node is not None:
3487 nl.append(self.op.remote_node)
3490 def CheckPrereq(self):
3491 """Check prerequisites.
3493 This checks that the instance is in the cluster.
3496 instance = self.cfg.GetInstanceInfo(
3497 self.cfg.ExpandInstanceName(self.op.instance_name))
3498 if instance is None:
3499 raise errors.OpPrereqError("Instance '%s' not known" %
3500 self.op.instance_name)
3501 self.instance = instance
3502 self.op.instance_name = instance.name
3504 if instance.disk_template not in constants.DTS_NET_MIRROR:
3505 raise errors.OpPrereqError("Instance's disk layout is not"
3506 " network mirrored.")
3508 if len(instance.secondary_nodes) != 1:
3509 raise errors.OpPrereqError("The instance has a strange layout,"
3510 " expected one secondary but found %d" %
3511 len(instance.secondary_nodes))
3513 self.sec_node = instance.secondary_nodes[0]
3515 remote_node = getattr(self.op, "remote_node", None)
3516 if remote_node is not None:
3517 remote_node = self.cfg.ExpandNodeName(remote_node)
3518 if remote_node is None:
3519 raise errors.OpPrereqError("Node '%s' not known" %
3520 self.op.remote_node)
3521 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3523 self.remote_node_info = None
3524 if remote_node == instance.primary_node:
3525 raise errors.OpPrereqError("The specified node is the primary node of"
3527 elif remote_node == self.sec_node:
3528 if self.op.mode == constants.REPLACE_DISK_SEC:
3529 # this is for DRBD8, where we can't execute the same mode of
3530 # replacement as for drbd7 (no different port allocated)
3531 raise errors.OpPrereqError("Same secondary given, cannot execute"
3533 # the user gave the current secondary, switch to
3534 # 'no-replace-secondary' mode for drbd7
3536 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3537 self.op.mode != constants.REPLACE_DISK_ALL):
3538 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3539 " disks replacement, not individual ones")
3540 if instance.disk_template == constants.DT_DRBD8:
3541 if (self.op.mode == constants.REPLACE_DISK_ALL and
3542 remote_node is not None):
3543 # switch to replace secondary mode
3544 self.op.mode = constants.REPLACE_DISK_SEC
3546 if self.op.mode == constants.REPLACE_DISK_ALL:
3547 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3548 " secondary disk replacement, not"
3550 elif self.op.mode == constants.REPLACE_DISK_PRI:
3551 if remote_node is not None:
3552 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3553 " the secondary while doing a primary"
3554 " node disk replacement")
3555 self.tgt_node = instance.primary_node
3556 self.oth_node = instance.secondary_nodes[0]
3557 elif self.op.mode == constants.REPLACE_DISK_SEC:
3558 self.new_node = remote_node # this can be None, in which case
3559 # we don't change the secondary
3560 self.tgt_node = instance.secondary_nodes[0]
3561 self.oth_node = instance.primary_node
3563 raise errors.ProgrammerError("Unhandled disk replace mode")
3565 for name in self.op.disks:
3566 if instance.FindDisk(name) is None:
3567 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3568 (name, instance.name))
3569 self.op.remote_node = remote_node
3571 def _ExecRR1(self, feedback_fn):
3572 """Replace the disks of an instance.
3575 instance = self.instance
3578 if self.op.remote_node is None:
3579 remote_node = self.sec_node
3581 remote_node = self.op.remote_node
3583 for dev in instance.disks:
3585 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3586 names = _GenerateUniqueNames(cfg, lv_names)
3587 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3588 remote_node, size, names)
3589 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3590 logger.Info("adding new mirror component on secondary for %s" %
3593 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3595 _GetInstanceInfoText(instance)):
3596 raise errors.OpExecError("Failed to create new component on secondary"
3597 " node %s. Full abort, cleanup manually!" %
3600 logger.Info("adding new mirror component on primary")
3602 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3604 _GetInstanceInfoText(instance)):
3605 # remove secondary dev
3606 cfg.SetDiskID(new_drbd, remote_node)
3607 rpc.call_blockdev_remove(remote_node, new_drbd)
3608 raise errors.OpExecError("Failed to create volume on primary!"
3609 " Full abort, cleanup manually!!")
3611 # the device exists now
3612 # call the primary node to add the mirror to md
3613 logger.Info("adding new mirror component to md")
3614 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3616 logger.Error("Can't add mirror compoment to md!")
3617 cfg.SetDiskID(new_drbd, remote_node)
3618 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3619 logger.Error("Can't rollback on secondary")
3620 cfg.SetDiskID(new_drbd, instance.primary_node)
3621 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3622 logger.Error("Can't rollback on primary")
3623 raise errors.OpExecError("Full abort, cleanup manually!!")
3625 dev.children.append(new_drbd)
3626 cfg.AddInstance(instance)
3628 # this can fail as the old devices are degraded and _WaitForSync
3629 # does a combined result over all disks, so we don't check its
3631 _WaitForSync(cfg, instance, self.proc, unlock=True)
3633 # so check manually all the devices
3634 for name in iv_names:
3635 dev, child, new_drbd = iv_names[name]
3636 cfg.SetDiskID(dev, instance.primary_node)
3637 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3639 raise errors.OpExecError("MD device %s is degraded!" % name)
3640 cfg.SetDiskID(new_drbd, instance.primary_node)
3641 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3643 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3645 for name in iv_names:
3646 dev, child, new_drbd = iv_names[name]
3647 logger.Info("remove mirror %s component" % name)
3648 cfg.SetDiskID(dev, instance.primary_node)
3649 if not rpc.call_blockdev_removechildren(instance.primary_node,
3651 logger.Error("Can't remove child from mirror, aborting"
3652 " *this device cleanup*.\nYou need to cleanup manually!!")
3655 for node in child.logical_id[:2]:
3656 logger.Info("remove child device on %s" % node)
3657 cfg.SetDiskID(child, node)
3658 if not rpc.call_blockdev_remove(node, child):
3659 logger.Error("Warning: failed to remove device from node %s,"
3660 " continuing operation." % node)
3662 dev.children.remove(child)
3664 cfg.AddInstance(instance)
3666 def _ExecD8DiskOnly(self, feedback_fn):
3667 """Replace a disk on the primary or secondary for dbrd8.
3669 The algorithm for replace is quite complicated:
3670 - for each disk to be replaced:
3671 - create new LVs on the target node with unique names
3672 - detach old LVs from the drbd device
3673 - rename old LVs to name_replaced.<time_t>
3674 - rename new LVs to old LVs
3675 - attach the new LVs (with the old names now) to the drbd device
3676 - wait for sync across all devices
3677 - for each modified disk:
3678 - remove old LVs (which have the name name_replaces.<time_t>)
3680 Failures are not very well handled.
3684 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3685 instance = self.instance
3687 vgname = self.cfg.GetVGName()
3690 tgt_node = self.tgt_node
3691 oth_node = self.oth_node
3693 # Step: check device activation
3694 self.proc.LogStep(1, steps_total, "check device existence")
3695 info("checking volume groups")
3696 my_vg = cfg.GetVGName()
3697 results = rpc.call_vg_list([oth_node, tgt_node])
3699 raise errors.OpExecError("Can't list volume groups on the nodes")
3700 for node in oth_node, tgt_node:
3701 res = results.get(node, False)
3702 if not res or my_vg not in res:
3703 raise errors.OpExecError("Volume group '%s' not found on %s" %
3705 for dev in instance.disks:
3706 if not dev.iv_name in self.op.disks:
3708 for node in tgt_node, oth_node:
3709 info("checking %s on %s" % (dev.iv_name, node))
3710 cfg.SetDiskID(dev, node)
3711 if not rpc.call_blockdev_find(node, dev):
3712 raise errors.OpExecError("Can't find device %s on node %s" %
3713 (dev.iv_name, node))
3715 # Step: check other node consistency
3716 self.proc.LogStep(2, steps_total, "check peer consistency")
3717 for dev in instance.disks:
3718 if not dev.iv_name in self.op.disks:
3720 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3721 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3722 oth_node==instance.primary_node):
3723 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3724 " to replace disks on this node (%s)" %
3725 (oth_node, tgt_node))
3727 # Step: create new storage
3728 self.proc.LogStep(3, steps_total, "allocate new storage")
3729 for dev in instance.disks:
3730 if not dev.iv_name in self.op.disks:
3733 cfg.SetDiskID(dev, tgt_node)
3734 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3735 names = _GenerateUniqueNames(cfg, lv_names)
3736 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3737 logical_id=(vgname, names[0]))
3738 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3739 logical_id=(vgname, names[1]))
3740 new_lvs = [lv_data, lv_meta]
3741 old_lvs = dev.children
3742 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3743 info("creating new local storage on %s for %s" %
3744 (tgt_node, dev.iv_name))
3745 # since we *always* want to create this LV, we use the
3746 # _Create...OnPrimary (which forces the creation), even if we
3747 # are talking about the secondary node
3748 for new_lv in new_lvs:
3749 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3750 _GetInstanceInfoText(instance)):
3751 raise errors.OpExecError("Failed to create new LV named '%s' on"
3753 (new_lv.logical_id[1], tgt_node))
3755 # Step: for each lv, detach+rename*2+attach
3756 self.proc.LogStep(4, steps_total, "change drbd configuration")
3757 for dev, old_lvs, new_lvs in iv_names.itervalues():
3758 info("detaching %s drbd from local storage" % dev.iv_name)
3759 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3760 raise errors.OpExecError("Can't detach drbd from local storage on node"
3761 " %s for device %s" % (tgt_node, dev.iv_name))
3763 #cfg.Update(instance)
3765 # ok, we created the new LVs, so now we know we have the needed
3766 # storage; as such, we proceed on the target node to rename
3767 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3768 # using the assumption that logical_id == physical_id (which in
3769 # turn is the unique_id on that node)
3771 # FIXME(iustin): use a better name for the replaced LVs
3772 temp_suffix = int(time.time())
3773 ren_fn = lambda d, suff: (d.physical_id[0],
3774 d.physical_id[1] + "_replaced-%s" % suff)
3775 # build the rename list based on what LVs exist on the node
3777 for to_ren in old_lvs:
3778 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3779 if find_res is not None: # device exists
3780 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3782 info("renaming the old LVs on the target node")
3783 if not rpc.call_blockdev_rename(tgt_node, rlist):
3784 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3785 # now we rename the new LVs to the old LVs
3786 info("renaming the new LVs on the target node")
3787 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3788 if not rpc.call_blockdev_rename(tgt_node, rlist):
3789 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3791 for old, new in zip(old_lvs, new_lvs):
3792 new.logical_id = old.logical_id
3793 cfg.SetDiskID(new, tgt_node)
3795 for disk in old_lvs:
3796 disk.logical_id = ren_fn(disk, temp_suffix)
3797 cfg.SetDiskID(disk, tgt_node)
3799 # now that the new lvs have the old name, we can add them to the device
3800 info("adding new mirror component on %s" % tgt_node)
3801 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3802 for new_lv in new_lvs:
3803 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3804 warning("Can't rollback device %s", hint="manually cleanup unused"
3806 raise errors.OpExecError("Can't add local storage to drbd")
3808 dev.children = new_lvs
3809 cfg.Update(instance)
3811 # Step: wait for sync
3813 # this can fail as the old devices are degraded and _WaitForSync
3814 # does a combined result over all disks, so we don't check its
3816 self.proc.LogStep(5, steps_total, "sync devices")
3817 _WaitForSync(cfg, instance, self.proc, unlock=True)
3819 # so check manually all the devices
3820 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3821 cfg.SetDiskID(dev, instance.primary_node)
3822 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3824 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3826 # Step: remove old storage
3827 self.proc.LogStep(6, steps_total, "removing old storage")
3828 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3829 info("remove logical volumes for %s" % name)
3831 cfg.SetDiskID(lv, tgt_node)
3832 if not rpc.call_blockdev_remove(tgt_node, lv):
3833 warning("Can't remove old LV", hint="manually remove unused LVs")
3836 def _ExecD8Secondary(self, feedback_fn):
3837 """Replace the secondary node for drbd8.
3839 The algorithm for replace is quite complicated:
3840 - for all disks of the instance:
3841 - create new LVs on the new node with same names
3842 - shutdown the drbd device on the old secondary
3843 - disconnect the drbd network on the primary
3844 - create the drbd device on the new secondary
3845 - network attach the drbd on the primary, using an artifice:
3846 the drbd code for Attach() will connect to the network if it
3847 finds a device which is connected to the good local disks but
3849 - wait for sync across all devices
3850 - remove all disks from the old secondary
3852 Failures are not very well handled.
3856 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3857 instance = self.instance
3859 vgname = self.cfg.GetVGName()
3862 old_node = self.tgt_node
3863 new_node = self.new_node
3864 pri_node = instance.primary_node
3866 # Step: check device activation
3867 self.proc.LogStep(1, steps_total, "check device existence")
3868 info("checking volume groups")
3869 my_vg = cfg.GetVGName()
3870 results = rpc.call_vg_list([pri_node, new_node])
3872 raise errors.OpExecError("Can't list volume groups on the nodes")
3873 for node in pri_node, new_node:
3874 res = results.get(node, False)
3875 if not res or my_vg not in res:
3876 raise errors.OpExecError("Volume group '%s' not found on %s" %
3878 for dev in instance.disks:
3879 if not dev.iv_name in self.op.disks:
3881 info("checking %s on %s" % (dev.iv_name, pri_node))
3882 cfg.SetDiskID(dev, pri_node)
3883 if not rpc.call_blockdev_find(pri_node, dev):
3884 raise errors.OpExecError("Can't find device %s on node %s" %
3885 (dev.iv_name, pri_node))
3887 # Step: check other node consistency
3888 self.proc.LogStep(2, steps_total, "check peer consistency")
3889 for dev in instance.disks:
3890 if not dev.iv_name in self.op.disks:
3892 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3893 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3894 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3895 " unsafe to replace the secondary" %
3898 # Step: create new storage
3899 self.proc.LogStep(3, steps_total, "allocate new storage")
3900 for dev in instance.disks:
3902 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3903 # since we *always* want to create this LV, we use the
3904 # _Create...OnPrimary (which forces the creation), even if we
3905 # are talking about the secondary node
3906 for new_lv in dev.children:
3907 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3908 _GetInstanceInfoText(instance)):
3909 raise errors.OpExecError("Failed to create new LV named '%s' on"
3911 (new_lv.logical_id[1], new_node))
3913 iv_names[dev.iv_name] = (dev, dev.children)
3915 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3916 for dev in instance.disks:
3918 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3919 # create new devices on new_node
3920 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3921 logical_id=(pri_node, new_node,
3923 children=dev.children)
3924 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3926 _GetInstanceInfoText(instance)):
3927 raise errors.OpExecError("Failed to create new DRBD on"
3928 " node '%s'" % new_node)
3930 for dev in instance.disks:
3931 # we have new devices, shutdown the drbd on the old secondary
3932 info("shutting down drbd for %s on old node" % dev.iv_name)
3933 cfg.SetDiskID(dev, old_node)
3934 if not rpc.call_blockdev_shutdown(old_node, dev):
3935 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3936 hint="Please cleanup this device manually as soon as possible")
3938 info("detaching primary drbds from the network (=> standalone)")
3940 for dev in instance.disks:
3941 cfg.SetDiskID(dev, pri_node)
3942 # set the physical (unique in bdev terms) id to None, meaning
3943 # detach from network
3944 dev.physical_id = (None,) * len(dev.physical_id)
3945 # and 'find' the device, which will 'fix' it to match the
3947 if rpc.call_blockdev_find(pri_node, dev):
3950 warning("Failed to detach drbd %s from network, unusual case" %
3954 # no detaches succeeded (very unlikely)
3955 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3957 # if we managed to detach at least one, we update all the disks of
3958 # the instance to point to the new secondary
3959 info("updating instance configuration")
3960 for dev in instance.disks:
3961 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3962 cfg.SetDiskID(dev, pri_node)
3963 cfg.Update(instance)
3965 # and now perform the drbd attach
3966 info("attaching primary drbds to new secondary (standalone => connected)")
3968 for dev in instance.disks:
3969 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3970 # since the attach is smart, it's enough to 'find' the device,
3971 # it will automatically activate the network, if the physical_id
3973 cfg.SetDiskID(dev, pri_node)
3974 if not rpc.call_blockdev_find(pri_node, dev):
3975 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3976 "please do a gnt-instance info to see the status of disks")
3978 # this can fail as the old devices are degraded and _WaitForSync
3979 # does a combined result over all disks, so we don't check its
3981 self.proc.LogStep(5, steps_total, "sync devices")
3982 _WaitForSync(cfg, instance, self.proc, unlock=True)
3984 # so check manually all the devices
3985 for name, (dev, old_lvs) in iv_names.iteritems():
3986 cfg.SetDiskID(dev, pri_node)
3987 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3989 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3991 self.proc.LogStep(6, steps_total, "removing old storage")
3992 for name, (dev, old_lvs) in iv_names.iteritems():
3993 info("remove logical volumes for %s" % name)
3995 cfg.SetDiskID(lv, old_node)
3996 if not rpc.call_blockdev_remove(old_node, lv):
3997 warning("Can't remove LV on old secondary",
3998 hint="Cleanup stale volumes by hand")
4000 def Exec(self, feedback_fn):
4001 """Execute disk replacement.
4003 This dispatches the disk replacement to the appropriate handler.
4006 instance = self.instance
4007 if instance.disk_template == constants.DT_REMOTE_RAID1:
4009 elif instance.disk_template == constants.DT_DRBD8:
4010 if self.op.remote_node is None:
4011 fn = self._ExecD8DiskOnly
4013 fn = self._ExecD8Secondary
4015 raise errors.ProgrammerError("Unhandled disk replacement case")
4016 return fn(feedback_fn)
4019 class LUQueryInstanceData(NoHooksLU):
4020 """Query runtime instance data.
4023 _OP_REQP = ["instances"]
4025 def CheckPrereq(self):
4026 """Check prerequisites.
4028 This only checks the optional instance list against the existing names.
4031 if not isinstance(self.op.instances, list):
4032 raise errors.OpPrereqError("Invalid argument type 'instances'")
4033 if self.op.instances:
4034 self.wanted_instances = []
4035 names = self.op.instances
4037 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4038 if instance is None:
4039 raise errors.OpPrereqError("No such instance name '%s'" % name)
4040 self.wanted_instances.append(instance)
4042 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4043 in self.cfg.GetInstanceList()]
4047 def _ComputeDiskStatus(self, instance, snode, dev):
4048 """Compute block device status.
4051 self.cfg.SetDiskID(dev, instance.primary_node)
4052 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4053 if dev.dev_type in constants.LDS_DRBD:
4054 # we change the snode then (otherwise we use the one passed in)
4055 if dev.logical_id[0] == instance.primary_node:
4056 snode = dev.logical_id[1]
4058 snode = dev.logical_id[0]
4061 self.cfg.SetDiskID(dev, snode)
4062 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4067 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4068 for child in dev.children]
4073 "iv_name": dev.iv_name,
4074 "dev_type": dev.dev_type,
4075 "logical_id": dev.logical_id,
4076 "physical_id": dev.physical_id,
4077 "pstatus": dev_pstatus,
4078 "sstatus": dev_sstatus,
4079 "children": dev_children,
4084 def Exec(self, feedback_fn):
4085 """Gather and return data"""
4087 for instance in self.wanted_instances:
4088 remote_info = rpc.call_instance_info(instance.primary_node,
4090 if remote_info and "state" in remote_info:
4093 remote_state = "down"
4094 if instance.status == "down":
4095 config_state = "down"
4099 disks = [self._ComputeDiskStatus(instance, None, device)
4100 for device in instance.disks]
4103 "name": instance.name,
4104 "config_state": config_state,
4105 "run_state": remote_state,
4106 "pnode": instance.primary_node,
4107 "snodes": instance.secondary_nodes,
4109 "memory": instance.memory,
4110 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4112 "network_port": instance.network_port,
4113 "vcpus": instance.vcpus,
4114 "kernel_path": instance.kernel_path,
4115 "initrd_path": instance.initrd_path,
4116 "hvm_boot_order": instance.hvm_boot_order,
4119 result[instance.name] = idict
4124 class LUSetInstanceParms(LogicalUnit):
4125 """Modifies an instances's parameters.
4128 HPATH = "instance-modify"
4129 HTYPE = constants.HTYPE_INSTANCE
4130 _OP_REQP = ["instance_name"]
4132 def BuildHooksEnv(self):
4135 This runs on the master, primary and secondaries.
4140 args['memory'] = self.mem
4142 args['vcpus'] = self.vcpus
4143 if self.do_ip or self.do_bridge or self.mac:
4147 ip = self.instance.nics[0].ip
4149 bridge = self.bridge
4151 bridge = self.instance.nics[0].bridge
4155 mac = self.instance.nics[0].mac
4156 args['nics'] = [(ip, bridge, mac)]
4157 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4158 nl = [self.sstore.GetMasterNode(),
4159 self.instance.primary_node] + list(self.instance.secondary_nodes)
4162 def CheckPrereq(self):
4163 """Check prerequisites.
4165 This only checks the instance list against the existing names.
4168 self.mem = getattr(self.op, "mem", None)
4169 self.vcpus = getattr(self.op, "vcpus", None)
4170 self.ip = getattr(self.op, "ip", None)
4171 self.mac = getattr(self.op, "mac", None)
4172 self.bridge = getattr(self.op, "bridge", None)
4173 self.kernel_path = getattr(self.op, "kernel_path", None)
4174 self.initrd_path = getattr(self.op, "initrd_path", None)
4175 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4176 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4177 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4178 if all_parms.count(None) == len(all_parms):
4179 raise errors.OpPrereqError("No changes submitted")
4180 if self.mem is not None:
4182 self.mem = int(self.mem)
4183 except ValueError, err:
4184 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4185 if self.vcpus is not None:
4187 self.vcpus = int(self.vcpus)
4188 except ValueError, err:
4189 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4190 if self.ip is not None:
4192 if self.ip.lower() == "none":
4195 if not utils.IsValidIP(self.ip):
4196 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4199 self.do_bridge = (self.bridge is not None)
4200 if self.mac is not None:
4201 if self.cfg.IsMacInUse(self.mac):
4202 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4204 if not utils.IsValidMac(self.mac):
4205 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4207 if self.kernel_path is not None:
4208 self.do_kernel_path = True
4209 if self.kernel_path == constants.VALUE_NONE:
4210 raise errors.OpPrereqError("Can't set instance to no kernel")
4212 if self.kernel_path != constants.VALUE_DEFAULT:
4213 if not os.path.isabs(self.kernel_path):
4214 raise errors.OpPrereqError("The kernel path must be an absolute"
4217 self.do_kernel_path = False
4219 if self.initrd_path is not None:
4220 self.do_initrd_path = True
4221 if self.initrd_path not in (constants.VALUE_NONE,
4222 constants.VALUE_DEFAULT):
4223 if not os.path.isabs(self.initrd_path):
4224 raise errors.OpPrereqError("The initrd path must be an absolute"
4227 self.do_initrd_path = False
4229 # boot order verification
4230 if self.hvm_boot_order is not None:
4231 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4232 if len(self.hvm_boot_order.strip("acdn")) != 0:
4233 raise errors.OpPrereqError("invalid boot order specified,"
4234 " must be one or more of [acdn]"
4237 instance = self.cfg.GetInstanceInfo(
4238 self.cfg.ExpandInstanceName(self.op.instance_name))
4239 if instance is None:
4240 raise errors.OpPrereqError("No such instance name '%s'" %
4241 self.op.instance_name)
4242 self.op.instance_name = instance.name
4243 self.instance = instance
4246 def Exec(self, feedback_fn):
4247 """Modifies an instance.
4249 All parameters take effect only at the next restart of the instance.
4252 instance = self.instance
4254 instance.memory = self.mem
4255 result.append(("mem", self.mem))
4257 instance.vcpus = self.vcpus
4258 result.append(("vcpus", self.vcpus))
4260 instance.nics[0].ip = self.ip
4261 result.append(("ip", self.ip))
4263 instance.nics[0].bridge = self.bridge
4264 result.append(("bridge", self.bridge))
4266 instance.nics[0].mac = self.mac
4267 result.append(("mac", self.mac))
4268 if self.do_kernel_path:
4269 instance.kernel_path = self.kernel_path
4270 result.append(("kernel_path", self.kernel_path))
4271 if self.do_initrd_path:
4272 instance.initrd_path = self.initrd_path
4273 result.append(("initrd_path", self.initrd_path))
4274 if self.hvm_boot_order:
4275 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4276 instance.hvm_boot_order = None
4278 instance.hvm_boot_order = self.hvm_boot_order
4279 result.append(("hvm_boot_order", self.hvm_boot_order))
4281 self.cfg.AddInstance(instance)
4286 class LUQueryExports(NoHooksLU):
4287 """Query the exports list
4292 def CheckPrereq(self):
4293 """Check that the nodelist contains only existing nodes.
4296 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4298 def Exec(self, feedback_fn):
4299 """Compute the list of all the exported system images.
4302 a dictionary with the structure node->(export-list)
4303 where export-list is a list of the instances exported on
4307 return rpc.call_export_list(self.nodes)
4310 class LUExportInstance(LogicalUnit):
4311 """Export an instance to an image in the cluster.
4314 HPATH = "instance-export"
4315 HTYPE = constants.HTYPE_INSTANCE
4316 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4318 def BuildHooksEnv(self):
4321 This will run on the master, primary node and target node.
4325 "EXPORT_NODE": self.op.target_node,
4326 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4328 env.update(_BuildInstanceHookEnvByObject(self.instance))
4329 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4330 self.op.target_node]
4333 def CheckPrereq(self):
4334 """Check prerequisites.
4336 This checks that the instance name is a valid one.
4339 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4340 self.instance = self.cfg.GetInstanceInfo(instance_name)
4341 if self.instance is None:
4342 raise errors.OpPrereqError("Instance '%s' not found" %
4343 self.op.instance_name)
4346 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4347 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4349 if self.dst_node is None:
4350 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4351 self.op.target_node)
4352 self.op.target_node = self.dst_node.name
4354 def Exec(self, feedback_fn):
4355 """Export an instance to an image in the cluster.
4358 instance = self.instance
4359 dst_node = self.dst_node
4360 src_node = instance.primary_node
4361 # shutdown the instance, unless requested not to do so
4362 if self.op.shutdown:
4363 op = opcodes.OpShutdownInstance(instance_name=instance.name)
4364 self.proc.ChainOpCode(op)
4366 vgname = self.cfg.GetVGName()
4371 for disk in instance.disks:
4372 if disk.iv_name == "sda":
4373 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4374 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4376 if not new_dev_name:
4377 logger.Error("could not snapshot block device %s on node %s" %
4378 (disk.logical_id[1], src_node))
4380 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4381 logical_id=(vgname, new_dev_name),
4382 physical_id=(vgname, new_dev_name),
4383 iv_name=disk.iv_name)
4384 snap_disks.append(new_dev)
4387 if self.op.shutdown:
4388 op = opcodes.OpStartupInstance(instance_name=instance.name,
4390 self.proc.ChainOpCode(op)
4392 # TODO: check for size
4394 for dev in snap_disks:
4395 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4397 logger.Error("could not export block device %s from node"
4399 (dev.logical_id[1], src_node, dst_node.name))
4400 if not rpc.call_blockdev_remove(src_node, dev):
4401 logger.Error("could not remove snapshot block device %s from"
4402 " node %s" % (dev.logical_id[1], src_node))
4404 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4405 logger.Error("could not finalize export for instance %s on node %s" %
4406 (instance.name, dst_node.name))
4408 nodelist = self.cfg.GetNodeList()
4409 nodelist.remove(dst_node.name)
4411 # on one-node clusters nodelist will be empty after the removal
4412 # if we proceed the backup would be removed because OpQueryExports
4413 # substitutes an empty list with the full cluster node list.
4415 op = opcodes.OpQueryExports(nodes=nodelist)
4416 exportlist = self.proc.ChainOpCode(op)
4417 for node in exportlist:
4418 if instance.name in exportlist[node]:
4419 if not rpc.call_export_remove(node, instance.name):
4420 logger.Error("could not remove older export for instance %s"
4421 " on node %s" % (instance.name, node))
4424 class TagsLU(NoHooksLU):
4427 This is an abstract class which is the parent of all the other tags LUs.
4430 def CheckPrereq(self):
4431 """Check prerequisites.
4434 if self.op.kind == constants.TAG_CLUSTER:
4435 self.target = self.cfg.GetClusterInfo()
4436 elif self.op.kind == constants.TAG_NODE:
4437 name = self.cfg.ExpandNodeName(self.op.name)
4439 raise errors.OpPrereqError("Invalid node name (%s)" %
4442 self.target = self.cfg.GetNodeInfo(name)
4443 elif self.op.kind == constants.TAG_INSTANCE:
4444 name = self.cfg.ExpandInstanceName(self.op.name)
4446 raise errors.OpPrereqError("Invalid instance name (%s)" %
4449 self.target = self.cfg.GetInstanceInfo(name)
4451 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4455 class LUGetTags(TagsLU):
4456 """Returns the tags of a given object.
4459 _OP_REQP = ["kind", "name"]
4461 def Exec(self, feedback_fn):
4462 """Returns the tag list.
4465 return self.target.GetTags()
4468 class LUSearchTags(NoHooksLU):
4469 """Searches the tags for a given pattern.
4472 _OP_REQP = ["pattern"]
4474 def CheckPrereq(self):
4475 """Check prerequisites.
4477 This checks the pattern passed for validity by compiling it.
4481 self.re = re.compile(self.op.pattern)
4482 except re.error, err:
4483 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4484 (self.op.pattern, err))
4486 def Exec(self, feedback_fn):
4487 """Returns the tag list.
4491 tgts = [("/cluster", cfg.GetClusterInfo())]
4492 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4493 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4494 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4495 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4497 for path, target in tgts:
4498 for tag in target.GetTags():
4499 if self.re.search(tag):
4500 results.append((path, tag))
4504 class LUAddTags(TagsLU):
4505 """Sets a tag on a given object.
4508 _OP_REQP = ["kind", "name", "tags"]
4510 def CheckPrereq(self):
4511 """Check prerequisites.
4513 This checks the type and length of the tag name and value.
4516 TagsLU.CheckPrereq(self)
4517 for tag in self.op.tags:
4518 objects.TaggableObject.ValidateTag(tag)
4520 def Exec(self, feedback_fn):
4525 for tag in self.op.tags:
4526 self.target.AddTag(tag)
4527 except errors.TagError, err:
4528 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4530 self.cfg.Update(self.target)
4531 except errors.ConfigurationError:
4532 raise errors.OpRetryError("There has been a modification to the"
4533 " config file and the operation has been"
4534 " aborted. Please retry.")
4537 class LUDelTags(TagsLU):
4538 """Delete a list of tags from a given object.
4541 _OP_REQP = ["kind", "name", "tags"]
4543 def CheckPrereq(self):
4544 """Check prerequisites.
4546 This checks that we have the given tag.
4549 TagsLU.CheckPrereq(self)
4550 for tag in self.op.tags:
4551 objects.TaggableObject.ValidateTag(tag)
4552 del_tags = frozenset(self.op.tags)
4553 cur_tags = self.target.GetTags()
4554 if not del_tags <= cur_tags:
4555 diff_tags = del_tags - cur_tags
4556 diff_names = ["'%s'" % tag for tag in diff_tags]
4558 raise errors.OpPrereqError("Tag(s) %s not found" %
4559 (",".join(diff_names)))
4561 def Exec(self, feedback_fn):
4562 """Remove the tag from the object.
4565 for tag in self.op.tags:
4566 self.target.RemoveTag(tag)
4568 self.cfg.Update(self.target)
4569 except errors.ConfigurationError:
4570 raise errors.OpRetryError("There has been a modification to the"
4571 " config file and the operation has been"
4572 " aborted. Please retry.")
4574 class LUTestDelay(NoHooksLU):
4575 """Sleep for a specified amount of time.
4577 This LU sleeps on the master and/or nodes for a specified amoutn of
4581 _OP_REQP = ["duration", "on_master", "on_nodes"]
4583 def CheckPrereq(self):
4584 """Check prerequisites.
4586 This checks that we have a good list of nodes and/or the duration
4591 if self.op.on_nodes:
4592 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4594 def Exec(self, feedback_fn):
4595 """Do the actual sleep.
4598 if self.op.on_master:
4599 if not utils.TestDelay(self.op.duration):
4600 raise errors.OpExecError("Error during master delay test")
4601 if self.op.on_nodes:
4602 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4604 raise errors.OpExecError("Complete failure from rpc call")
4605 for node, node_result in result.items():
4607 raise errors.OpExecError("Failure during rpc call to node %s,"
4608 " result: %s" % (node, node_result))