4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
46 class LogicalUnit(object):
47 """Logical Unit base class.
49 Subclasses must follow these rules:
50 - implement CheckPrereq which also fills in the opcode instance
51 with all the fields (even if as None)
53 - implement BuildHooksEnv
54 - redefine HPATH and HTYPE
55 - optionally redefine their run requirements (REQ_CLUSTER,
56 REQ_MASTER); note that all commands require root permissions
65 def __init__(self, processor, op, cfg, sstore):
66 """Constructor for LogicalUnit.
68 This needs to be overriden in derived classes in order to check op
76 for attr_name in self._OP_REQP:
77 attr_val = getattr(op, attr_name, None)
79 raise errors.OpPrereqError("Required parameter '%s' missing" %
82 if not cfg.IsCluster():
83 raise errors.OpPrereqError("Cluster not initialized yet,"
84 " use 'gnt-cluster init' first.")
86 master = sstore.GetMasterNode()
87 if master != utils.HostInfo().name:
88 raise errors.OpPrereqError("Commands must be run on the master"
91 def CheckPrereq(self):
92 """Check prerequisites for this LU.
94 This method should check that the prerequisites for the execution
95 of this LU are fulfilled. It can do internode communication, but
96 it should be idempotent - no cluster or system changes are
99 The method should raise errors.OpPrereqError in case something is
100 not fulfilled. Its return value is ignored.
102 This method should also update all the parameters of the opcode to
103 their canonical form; e.g. a short node name must be fully
104 expanded after this method has successfully completed (so that
105 hooks, logging, etc. work correctly).
108 raise NotImplementedError
110 def Exec(self, feedback_fn):
113 This method should implement the actual work. It should raise
114 errors.OpExecError for failures that are somewhat dealt with in
118 raise NotImplementedError
120 def BuildHooksEnv(self):
121 """Build hooks environment for this LU.
123 This method should return a three-node tuple consisting of: a dict
124 containing the environment that will be used for running the
125 specific hook for this LU, a list of node names on which the hook
126 should run before the execution, and a list of node names on which
127 the hook should run after the execution.
129 The keys of the dict must not have 'GANETI_' prefixed as this will
130 be handled in the hooks runner. Also note additional keys will be
131 added by the hooks runner. If the LU doesn't define any
132 environment, an empty dict (and not None) should be returned.
134 As for the node lists, the master should not be included in the
135 them, as it will be added by the hooks runner in case this LU
136 requires a cluster to run on (otherwise we don't have a node
137 list). No nodes should be returned as an empty list (and not
140 Note that if the HPATH for a LU class is None, this function will
144 raise NotImplementedError
147 class NoHooksLU(LogicalUnit):
148 """Simple LU which runs no hooks.
150 This LU is intended as a parent for other LogicalUnits which will
151 run no hooks, in order to reduce duplicate code.
157 def BuildHooksEnv(self):
160 This is a no-op, since we don't run hooks.
166 def _AddHostToEtcHosts(hostname):
167 """Wrapper around utils.SetEtcHostsEntry.
170 hi = utils.HostInfo(name=hostname)
171 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174 def _RemoveHostFromEtcHosts(hostname):
175 """Wrapper around utils.RemoveEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183 def _GetWantedNodes(lu, nodes):
184 """Returns list of checked and expanded node names.
187 nodes: List of nodes (strings) or None for all
190 if not isinstance(nodes, list):
191 raise errors.OpPrereqError("Invalid argument type 'nodes'")
197 node = lu.cfg.ExpandNodeName(name)
199 raise errors.OpPrereqError("No such node name '%s'" % name)
203 wanted = lu.cfg.GetNodeList()
204 return utils.NiceSort(wanted)
207 def _GetWantedInstances(lu, instances):
208 """Returns list of checked and expanded instance names.
211 instances: List of instances (strings) or None for all
214 if not isinstance(instances, list):
215 raise errors.OpPrereqError("Invalid argument type 'instances'")
220 for name in instances:
221 instance = lu.cfg.ExpandInstanceName(name)
223 raise errors.OpPrereqError("No such instance name '%s'" % name)
224 wanted.append(instance)
227 wanted = lu.cfg.GetInstanceList()
228 return utils.NiceSort(wanted)
231 def _CheckOutputFields(static, dynamic, selected):
232 """Checks whether all selected fields are valid.
235 static: Static fields
236 dynamic: Dynamic fields
239 static_fields = frozenset(static)
240 dynamic_fields = frozenset(dynamic)
242 all_fields = static_fields | dynamic_fields
244 if not all_fields.issuperset(selected):
245 raise errors.OpPrereqError("Unknown output fields selected: %s"
246 % ",".join(frozenset(selected).
247 difference(all_fields)))
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251 memory, vcpus, nics):
252 """Builds instance related env variables for hooks from single variables.
255 secondary_nodes: List of secondary nodes as strings
259 "INSTANCE_NAME": name,
260 "INSTANCE_PRIMARY": primary_node,
261 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262 "INSTANCE_OS_TYPE": os_type,
263 "INSTANCE_STATUS": status,
264 "INSTANCE_MEMORY": memory,
265 "INSTANCE_VCPUS": vcpus,
269 nic_count = len(nics)
270 for idx, (ip, bridge, mac) in enumerate(nics):
273 env["INSTANCE_NIC%d_IP" % idx] = ip
274 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
279 env["INSTANCE_NIC_COUNT"] = nic_count
284 def _BuildInstanceHookEnvByObject(instance, override=None):
285 """Builds instance related env variables for hooks from an object.
288 instance: objects.Instance object of instance
289 override: dict of values to override
292 'name': instance.name,
293 'primary_node': instance.primary_node,
294 'secondary_nodes': instance.secondary_nodes,
295 'os_type': instance.os,
296 'status': instance.os,
297 'memory': instance.memory,
298 'vcpus': instance.vcpus,
299 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
302 args.update(override)
303 return _BuildInstanceHookEnv(**args)
306 def _UpdateKnownHosts(fullnode, ip, pubkey):
307 """Ensure a node has a correct known_hosts entry.
310 fullnode - Fully qualified domain name of host. (str)
311 ip - IPv4 address of host (str)
312 pubkey - the public key of the cluster
315 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
318 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
327 logger.Debug('read %s' % (repr(rawline),))
329 parts = rawline.rstrip('\r\n').split()
331 # Ignore unwanted lines
332 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333 fields = parts[0].split(',')
338 for spec in [ ip, fullnode ]:
339 if spec not in fields:
344 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345 if haveall and key == pubkey:
347 save_lines.append(rawline)
348 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
351 if havesome and (not haveall or key != pubkey):
353 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
356 save_lines.append(rawline)
359 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
363 save_lines = save_lines + add_lines
365 # Write a new file and replace old.
366 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
368 newfile = os.fdopen(fd, 'w')
370 newfile.write(''.join(save_lines))
373 logger.Debug("Wrote new known_hosts.")
374 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
377 # Simply appending a new line will do the trick.
379 for add in add_lines:
385 def _HasValidVG(vglist, vgname):
386 """Checks if the volume group list is valid.
388 A non-None return value means there's an error, and the return value
389 is the error message.
392 vgsize = vglist.get(vgname, None)
394 return "volume group '%s' missing" % vgname
396 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
401 def _InitSSHSetup(node):
402 """Setup the SSH configuration for the cluster.
405 This generates a dsa keypair for root, adds the pub key to the
406 permitted hosts and adds the hostkey to its own known hosts.
409 node: the name of this host as a fqdn
412 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
414 for name in priv_key, pub_key:
415 if os.path.exists(name):
416 utils.CreateBackup(name)
417 utils.RemoveFile(name)
419 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
423 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
426 f = open(pub_key, 'r')
428 utils.AddAuthorizedKey(auth_keys, f.read(8192))
433 def _InitGanetiServerSetup(ss):
434 """Setup the necessary configuration for the initial node daemon.
436 This creates the nodepass file containing the shared password for
437 the cluster and also generates the SSL certificate.
440 # Create pseudo random password
441 randpass = sha.new(os.urandom(64)).hexdigest()
442 # and write it into sstore
443 ss.SetKey(ss.SS_NODED_PASS, randpass)
445 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446 "-days", str(365*5), "-nodes", "-x509",
447 "-keyout", constants.SSL_CERT_FILE,
448 "-out", constants.SSL_CERT_FILE, "-batch"])
450 raise errors.OpExecError("could not generate server ssl cert, command"
451 " %s had exitcode %s and error message %s" %
452 (result.cmd, result.exit_code, result.output))
454 os.chmod(constants.SSL_CERT_FILE, 0400)
456 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
459 raise errors.OpExecError("Could not start the node daemon, command %s"
460 " had exitcode %s and error %s" %
461 (result.cmd, result.exit_code, result.output))
464 def _CheckInstanceBridgesExist(instance):
465 """Check that the brigdes needed by an instance exist.
468 # check bridges existance
469 brlist = [nic.bridge for nic in instance.nics]
470 if not rpc.call_bridges_exist(instance.primary_node, brlist):
471 raise errors.OpPrereqError("one or more target bridges %s does not"
472 " exist on destination node '%s'" %
473 (brlist, instance.primary_node))
476 class LUInitCluster(LogicalUnit):
477 """Initialise the cluster.
480 HPATH = "cluster-init"
481 HTYPE = constants.HTYPE_CLUSTER
482 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483 "def_bridge", "master_netdev"]
486 def BuildHooksEnv(self):
489 Notes: Since we don't require a cluster, we must manually add
490 ourselves in the post-run node list.
493 env = {"OP_TARGET": self.op.cluster_name}
494 return env, [], [self.hostname.name]
496 def CheckPrereq(self):
497 """Verify that the passed name is a valid one.
500 if config.ConfigWriter.IsCluster():
501 raise errors.OpPrereqError("Cluster is already initialised")
503 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504 if not os.path.exists(constants.VNC_PASSWORD_FILE):
505 raise errors.OpPrereqError("Please prepare the cluster VNC"
507 constants.VNC_PASSWORD_FILE)
509 self.hostname = hostname = utils.HostInfo()
511 if hostname.ip.startswith("127."):
512 raise errors.OpPrereqError("This host's IP resolves to the private"
513 " range (%s). Please fix DNS or %s." %
514 (hostname.ip, constants.ETC_HOSTS))
516 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
517 source=constants.LOCALHOST_IP_ADDRESS):
518 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
519 " to %s,\nbut this ip address does not"
520 " belong to this host."
521 " Aborting." % hostname.ip)
523 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
525 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
527 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
529 secondary_ip = getattr(self.op, "secondary_ip", None)
530 if secondary_ip and not utils.IsValidIP(secondary_ip):
531 raise errors.OpPrereqError("Invalid secondary ip given")
533 secondary_ip != hostname.ip and
534 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
535 source=constants.LOCALHOST_IP_ADDRESS))):
536 raise errors.OpPrereqError("You gave %s as secondary IP,"
537 " but it does not belong to this host." %
539 self.secondary_ip = secondary_ip
541 # checks presence of the volume group given
542 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
545 raise errors.OpPrereqError("Error: %s" % vgstatus)
547 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
549 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
552 if self.op.hypervisor_type not in constants.HYPER_TYPES:
553 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
554 self.op.hypervisor_type)
556 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
558 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
559 (self.op.master_netdev,
560 result.output.strip()))
562 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
563 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
564 raise errors.OpPrereqError("Init.d script '%s' missing or not"
565 " executable." % constants.NODE_INITD_SCRIPT)
567 def Exec(self, feedback_fn):
568 """Initialize the cluster.
571 clustername = self.clustername
572 hostname = self.hostname
574 # set up the simple store
575 self.sstore = ss = ssconf.SimpleStore()
576 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
577 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
578 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
579 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
580 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
582 # set up the inter-node password and certificate
583 _InitGanetiServerSetup(ss)
585 # start the master ip
586 rpc.call_node_start_master(hostname.name)
588 # set up ssh config and /etc/hosts
589 f = open(constants.SSH_HOST_RSA_PUB, 'r')
594 sshkey = sshline.split(" ")[1]
596 _AddHostToEtcHosts(hostname.name)
598 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
600 _InitSSHSetup(hostname.name)
602 # init of cluster config file
603 self.cfg = cfgw = config.ConfigWriter()
604 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
605 sshkey, self.op.mac_prefix,
606 self.op.vg_name, self.op.def_bridge)
609 class LUDestroyCluster(NoHooksLU):
610 """Logical unit for destroying the cluster.
615 def CheckPrereq(self):
616 """Check prerequisites.
618 This checks whether the cluster is empty.
620 Any errors are signalled by raising errors.OpPrereqError.
623 master = self.sstore.GetMasterNode()
625 nodelist = self.cfg.GetNodeList()
626 if len(nodelist) != 1 or nodelist[0] != master:
627 raise errors.OpPrereqError("There are still %d node(s) in"
628 " this cluster." % (len(nodelist) - 1))
629 instancelist = self.cfg.GetInstanceList()
631 raise errors.OpPrereqError("There are still %d instance(s) in"
632 " this cluster." % len(instancelist))
634 def Exec(self, feedback_fn):
635 """Destroys the cluster.
638 master = self.sstore.GetMasterNode()
639 if not rpc.call_node_stop_master(master):
640 raise errors.OpExecError("Could not disable the master role")
641 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
642 utils.CreateBackup(priv_key)
643 utils.CreateBackup(pub_key)
644 rpc.call_node_leave_cluster(master)
647 class LUVerifyCluster(NoHooksLU):
648 """Verifies the cluster status.
653 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
654 remote_version, feedback_fn):
655 """Run multiple tests against a node.
658 - compares ganeti version
659 - checks vg existance and size > 20G
660 - checks config file checksum
661 - checks ssh to other nodes
664 node: name of the node to check
665 file_list: required list of files
666 local_cksum: dictionary of local files and their checksums
669 # compares ganeti version
670 local_version = constants.PROTOCOL_VERSION
671 if not remote_version:
672 feedback_fn(" - ERROR: connection to %s failed" % (node))
675 if local_version != remote_version:
676 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
677 (local_version, node, remote_version))
680 # checks vg existance and size > 20G
684 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
688 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
690 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
693 # checks config file checksum
696 if 'filelist' not in node_result:
698 feedback_fn(" - ERROR: node hasn't returned file checksum data")
700 remote_cksum = node_result['filelist']
701 for file_name in file_list:
702 if file_name not in remote_cksum:
704 feedback_fn(" - ERROR: file '%s' missing" % file_name)
705 elif remote_cksum[file_name] != local_cksum[file_name]:
707 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
709 if 'nodelist' not in node_result:
711 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
713 if node_result['nodelist']:
715 for node in node_result['nodelist']:
716 feedback_fn(" - ERROR: communication with node '%s': %s" %
717 (node, node_result['nodelist'][node]))
718 hyp_result = node_result.get('hypervisor', None)
719 if hyp_result is not None:
720 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
723 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
724 """Verify an instance.
726 This function checks to see if the required block devices are
727 available on the instance's node.
732 instancelist = self.cfg.GetInstanceList()
733 if not instance in instancelist:
734 feedback_fn(" - ERROR: instance %s not in instance list %s" %
735 (instance, instancelist))
738 instanceconfig = self.cfg.GetInstanceInfo(instance)
739 node_current = instanceconfig.primary_node
742 instanceconfig.MapLVsByNode(node_vol_should)
744 for node in node_vol_should:
745 for volume in node_vol_should[node]:
746 if node not in node_vol_is or volume not in node_vol_is[node]:
747 feedback_fn(" - ERROR: volume %s missing on node %s" %
751 if not instanceconfig.status == 'down':
752 if not instance in node_instance[node_current]:
753 feedback_fn(" - ERROR: instance %s not running on node %s" %
754 (instance, node_current))
757 for node in node_instance:
758 if (not node == node_current):
759 if instance in node_instance[node]:
760 feedback_fn(" - ERROR: instance %s should not run on node %s" %
766 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
767 """Verify if there are any unknown volumes in the cluster.
769 The .os, .swap and backup volumes are ignored. All other volumes are
775 for node in node_vol_is:
776 for volume in node_vol_is[node]:
777 if node not in node_vol_should or volume not in node_vol_should[node]:
778 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
783 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
784 """Verify the list of running instances.
786 This checks what instances are running but unknown to the cluster.
790 for node in node_instance:
791 for runninginstance in node_instance[node]:
792 if runninginstance not in instancelist:
793 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
794 (runninginstance, node))
798 def CheckPrereq(self):
799 """Check prerequisites.
801 This has no prerequisites.
806 def Exec(self, feedback_fn):
807 """Verify integrity of cluster, performing various test on nodes.
811 feedback_fn("* Verifying global settings")
812 for msg in self.cfg.VerifyConfig():
813 feedback_fn(" - ERROR: %s" % msg)
815 vg_name = self.cfg.GetVGName()
816 nodelist = utils.NiceSort(self.cfg.GetNodeList())
817 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
821 # FIXME: verify OS list
823 file_names = list(self.sstore.GetFileList())
824 file_names.append(constants.SSL_CERT_FILE)
825 file_names.append(constants.CLUSTER_CONF_FILE)
826 local_checksums = utils.FingerprintFiles(file_names)
828 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
829 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
830 all_instanceinfo = rpc.call_instance_list(nodelist)
831 all_vglist = rpc.call_vg_list(nodelist)
832 node_verify_param = {
833 'filelist': file_names,
834 'nodelist': nodelist,
837 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
838 all_rversion = rpc.call_version(nodelist)
840 for node in nodelist:
841 feedback_fn("* Verifying node %s" % node)
842 result = self._VerifyNode(node, file_names, local_checksums,
843 all_vglist[node], all_nvinfo[node],
844 all_rversion[node], feedback_fn)
848 volumeinfo = all_volumeinfo[node]
850 if isinstance(volumeinfo, basestring):
851 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
852 (node, volumeinfo[-400:].encode('string_escape')))
854 node_volume[node] = {}
855 elif not isinstance(volumeinfo, dict):
856 feedback_fn(" - ERROR: connection to %s failed" % (node,))
860 node_volume[node] = volumeinfo
863 nodeinstance = all_instanceinfo[node]
864 if type(nodeinstance) != list:
865 feedback_fn(" - ERROR: connection to %s failed" % (node,))
869 node_instance[node] = nodeinstance
873 for instance in instancelist:
874 feedback_fn("* Verifying instance %s" % instance)
875 result = self._VerifyInstance(instance, node_volume, node_instance,
879 inst_config = self.cfg.GetInstanceInfo(instance)
881 inst_config.MapLVsByNode(node_vol_should)
883 feedback_fn("* Verifying orphan volumes")
884 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
888 feedback_fn("* Verifying remaining instances")
889 result = self._VerifyOrphanInstances(instancelist, node_instance,
896 class LUVerifyDisks(NoHooksLU):
897 """Verifies the cluster disks status.
902 def CheckPrereq(self):
903 """Check prerequisites.
905 This has no prerequisites.
910 def Exec(self, feedback_fn):
911 """Verify integrity of cluster disks.
914 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
916 vg_name = self.cfg.GetVGName()
917 nodes = utils.NiceSort(self.cfg.GetNodeList())
918 instances = [self.cfg.GetInstanceInfo(name)
919 for name in self.cfg.GetInstanceList()]
922 for inst in instances:
924 if (inst.status != "up" or
925 inst.disk_template not in constants.DTS_NET_MIRROR):
927 inst.MapLVsByNode(inst_lvs)
928 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
929 for node, vol_list in inst_lvs.iteritems():
931 nv_dict[(node, vol)] = inst
936 node_lvs = rpc.call_volume_list(nodes, vg_name)
943 if isinstance(lvs, basestring):
944 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
946 elif not isinstance(lvs, dict):
947 logger.Info("connection to node %s failed or invalid data returned" %
949 res_nodes.append(node)
952 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
953 inst = nv_dict.pop((node, lv_name), None)
954 if (not lv_online and inst is not None
955 and inst.name not in res_instances):
956 res_instances.append(inst.name)
958 # any leftover items in nv_dict are missing LVs, let's arrange the
960 for key, inst in nv_dict.iteritems():
961 if inst.name not in res_missing:
962 res_missing[inst.name] = []
963 res_missing[inst.name].append(key)
968 class LURenameCluster(LogicalUnit):
969 """Rename the cluster.
972 HPATH = "cluster-rename"
973 HTYPE = constants.HTYPE_CLUSTER
976 def BuildHooksEnv(self):
981 "OP_TARGET": self.sstore.GetClusterName(),
982 "NEW_NAME": self.op.name,
984 mn = self.sstore.GetMasterNode()
985 return env, [mn], [mn]
987 def CheckPrereq(self):
988 """Verify that the passed name is a valid one.
991 hostname = utils.HostInfo(self.op.name)
993 new_name = hostname.name
994 self.ip = new_ip = hostname.ip
995 old_name = self.sstore.GetClusterName()
996 old_ip = self.sstore.GetMasterIP()
997 if new_name == old_name and new_ip == old_ip:
998 raise errors.OpPrereqError("Neither the name nor the IP address of the"
999 " cluster has changed")
1000 if new_ip != old_ip:
1001 result = utils.RunCmd(["fping", "-q", new_ip])
1002 if not result.failed:
1003 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1004 " reachable on the network. Aborting." %
1007 self.op.name = new_name
1009 def Exec(self, feedback_fn):
1010 """Rename the cluster.
1013 clustername = self.op.name
1017 # shutdown the master IP
1018 master = ss.GetMasterNode()
1019 if not rpc.call_node_stop_master(master):
1020 raise errors.OpExecError("Could not disable the master role")
1024 ss.SetKey(ss.SS_MASTER_IP, ip)
1025 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1027 # Distribute updated ss config to all nodes
1028 myself = self.cfg.GetNodeInfo(master)
1029 dist_nodes = self.cfg.GetNodeList()
1030 if myself.name in dist_nodes:
1031 dist_nodes.remove(myself.name)
1033 logger.Debug("Copying updated ssconf data to all nodes")
1034 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1035 fname = ss.KeyToFilename(keyname)
1036 result = rpc.call_upload_file(dist_nodes, fname)
1037 for to_node in dist_nodes:
1038 if not result[to_node]:
1039 logger.Error("copy of file %s to node %s failed" %
1042 if not rpc.call_node_start_master(master):
1043 logger.Error("Could not re-enable the master role on the master,"
1044 " please restart manually.")
1047 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1048 """Sleep and poll for an instance's disk to sync.
1051 if not instance.disks:
1055 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1057 node = instance.primary_node
1059 for dev in instance.disks:
1060 cfgw.SetDiskID(dev, node)
1066 cumul_degraded = False
1067 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1069 proc.LogWarning("Can't get any data from node %s" % node)
1072 raise errors.RemoteError("Can't contact node %s for mirror data,"
1073 " aborting." % node)
1077 for i in range(len(rstats)):
1080 proc.LogWarning("Can't compute data for node %s/%s" %
1081 (node, instance.disks[i].iv_name))
1083 # we ignore the ldisk parameter
1084 perc_done, est_time, is_degraded, _ = mstat
1085 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1086 if perc_done is not None:
1088 if est_time is not None:
1089 rem_time = "%d estimated seconds remaining" % est_time
1092 rem_time = "no time estimate"
1093 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1094 (instance.disks[i].iv_name, perc_done, rem_time))
1101 time.sleep(min(60, max_time))
1107 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1108 return not cumul_degraded
1111 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1112 """Check that mirrors are not degraded.
1114 The ldisk parameter, if True, will change the test from the
1115 is_degraded attribute (which represents overall non-ok status for
1116 the device(s)) to the ldisk (representing the local storage status).
1119 cfgw.SetDiskID(dev, node)
1126 if on_primary or dev.AssembleOnSecondary():
1127 rstats = rpc.call_blockdev_find(node, dev)
1129 logger.ToStderr("Disk degraded or not found on node %s" % node)
1132 result = result and (not rstats[idx])
1134 for child in dev.children:
1135 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1140 class LUDiagnoseOS(NoHooksLU):
1141 """Logical unit for OS diagnose/query.
1146 def CheckPrereq(self):
1147 """Check prerequisites.
1149 This always succeeds, since this is a pure query LU.
1154 def Exec(self, feedback_fn):
1155 """Compute the list of OSes.
1158 node_list = self.cfg.GetNodeList()
1159 node_data = rpc.call_os_diagnose(node_list)
1160 if node_data == False:
1161 raise errors.OpExecError("Can't gather the list of OSes")
1165 class LURemoveNode(LogicalUnit):
1166 """Logical unit for removing a node.
1169 HPATH = "node-remove"
1170 HTYPE = constants.HTYPE_NODE
1171 _OP_REQP = ["node_name"]
1173 def BuildHooksEnv(self):
1176 This doesn't run on the target node in the pre phase as a failed
1177 node would not allows itself to run.
1181 "OP_TARGET": self.op.node_name,
1182 "NODE_NAME": self.op.node_name,
1184 all_nodes = self.cfg.GetNodeList()
1185 all_nodes.remove(self.op.node_name)
1186 return env, all_nodes, all_nodes
1188 def CheckPrereq(self):
1189 """Check prerequisites.
1192 - the node exists in the configuration
1193 - it does not have primary or secondary instances
1194 - it's not the master
1196 Any errors are signalled by raising errors.OpPrereqError.
1199 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1201 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1203 instance_list = self.cfg.GetInstanceList()
1205 masternode = self.sstore.GetMasterNode()
1206 if node.name == masternode:
1207 raise errors.OpPrereqError("Node is the master node,"
1208 " you need to failover first.")
1210 for instance_name in instance_list:
1211 instance = self.cfg.GetInstanceInfo(instance_name)
1212 if node.name == instance.primary_node:
1213 raise errors.OpPrereqError("Instance %s still running on the node,"
1214 " please remove first." % instance_name)
1215 if node.name in instance.secondary_nodes:
1216 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1217 " please remove first." % instance_name)
1218 self.op.node_name = node.name
1221 def Exec(self, feedback_fn):
1222 """Removes the node from the cluster.
1226 logger.Info("stopping the node daemon and removing configs from node %s" %
1229 rpc.call_node_leave_cluster(node.name)
1231 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1233 logger.Info("Removing node %s from config" % node.name)
1235 self.cfg.RemoveNode(node.name)
1237 _RemoveHostFromEtcHosts(node.name)
1240 class LUQueryNodes(NoHooksLU):
1241 """Logical unit for querying nodes.
1244 _OP_REQP = ["output_fields", "names"]
1246 def CheckPrereq(self):
1247 """Check prerequisites.
1249 This checks that the fields required are valid output fields.
1252 self.dynamic_fields = frozenset(["dtotal", "dfree",
1253 "mtotal", "mnode", "mfree",
1256 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1257 "pinst_list", "sinst_list",
1259 dynamic=self.dynamic_fields,
1260 selected=self.op.output_fields)
1262 self.wanted = _GetWantedNodes(self, self.op.names)
1264 def Exec(self, feedback_fn):
1265 """Computes the list of nodes and their attributes.
1268 nodenames = self.wanted
1269 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1271 # begin data gathering
1273 if self.dynamic_fields.intersection(self.op.output_fields):
1275 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1276 for name in nodenames:
1277 nodeinfo = node_data.get(name, None)
1280 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1281 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1282 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1283 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1284 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1285 "bootid": nodeinfo['bootid'],
1288 live_data[name] = {}
1290 live_data = dict.fromkeys(nodenames, {})
1292 node_to_primary = dict([(name, set()) for name in nodenames])
1293 node_to_secondary = dict([(name, set()) for name in nodenames])
1295 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1296 "sinst_cnt", "sinst_list"))
1297 if inst_fields & frozenset(self.op.output_fields):
1298 instancelist = self.cfg.GetInstanceList()
1300 for instance_name in instancelist:
1301 inst = self.cfg.GetInstanceInfo(instance_name)
1302 if inst.primary_node in node_to_primary:
1303 node_to_primary[inst.primary_node].add(inst.name)
1304 for secnode in inst.secondary_nodes:
1305 if secnode in node_to_secondary:
1306 node_to_secondary[secnode].add(inst.name)
1308 # end data gathering
1311 for node in nodelist:
1313 for field in self.op.output_fields:
1316 elif field == "pinst_list":
1317 val = list(node_to_primary[node.name])
1318 elif field == "sinst_list":
1319 val = list(node_to_secondary[node.name])
1320 elif field == "pinst_cnt":
1321 val = len(node_to_primary[node.name])
1322 elif field == "sinst_cnt":
1323 val = len(node_to_secondary[node.name])
1324 elif field == "pip":
1325 val = node.primary_ip
1326 elif field == "sip":
1327 val = node.secondary_ip
1328 elif field in self.dynamic_fields:
1329 val = live_data[node.name].get(field, None)
1331 raise errors.ParameterError(field)
1332 node_output.append(val)
1333 output.append(node_output)
1338 class LUQueryNodeVolumes(NoHooksLU):
1339 """Logical unit for getting volumes on node(s).
1342 _OP_REQP = ["nodes", "output_fields"]
1344 def CheckPrereq(self):
1345 """Check prerequisites.
1347 This checks that the fields required are valid output fields.
1350 self.nodes = _GetWantedNodes(self, self.op.nodes)
1352 _CheckOutputFields(static=["node"],
1353 dynamic=["phys", "vg", "name", "size", "instance"],
1354 selected=self.op.output_fields)
1357 def Exec(self, feedback_fn):
1358 """Computes the list of nodes and their attributes.
1361 nodenames = self.nodes
1362 volumes = rpc.call_node_volumes(nodenames)
1364 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1365 in self.cfg.GetInstanceList()]
1367 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1370 for node in nodenames:
1371 if node not in volumes or not volumes[node]:
1374 node_vols = volumes[node][:]
1375 node_vols.sort(key=lambda vol: vol['dev'])
1377 for vol in node_vols:
1379 for field in self.op.output_fields:
1382 elif field == "phys":
1386 elif field == "name":
1388 elif field == "size":
1389 val = int(float(vol['size']))
1390 elif field == "instance":
1392 if node not in lv_by_node[inst]:
1394 if vol['name'] in lv_by_node[inst][node]:
1400 raise errors.ParameterError(field)
1401 node_output.append(str(val))
1403 output.append(node_output)
1408 class LUAddNode(LogicalUnit):
1409 """Logical unit for adding node to the cluster.
1413 HTYPE = constants.HTYPE_NODE
1414 _OP_REQP = ["node_name"]
1416 def BuildHooksEnv(self):
1419 This will run on all nodes before, and on all nodes + the new node after.
1423 "OP_TARGET": self.op.node_name,
1424 "NODE_NAME": self.op.node_name,
1425 "NODE_PIP": self.op.primary_ip,
1426 "NODE_SIP": self.op.secondary_ip,
1428 nodes_0 = self.cfg.GetNodeList()
1429 nodes_1 = nodes_0 + [self.op.node_name, ]
1430 return env, nodes_0, nodes_1
1432 def CheckPrereq(self):
1433 """Check prerequisites.
1436 - the new node is not already in the config
1438 - its parameters (single/dual homed) matches the cluster
1440 Any errors are signalled by raising errors.OpPrereqError.
1443 node_name = self.op.node_name
1446 dns_data = utils.HostInfo(node_name)
1448 node = dns_data.name
1449 primary_ip = self.op.primary_ip = dns_data.ip
1450 secondary_ip = getattr(self.op, "secondary_ip", None)
1451 if secondary_ip is None:
1452 secondary_ip = primary_ip
1453 if not utils.IsValidIP(secondary_ip):
1454 raise errors.OpPrereqError("Invalid secondary IP given")
1455 self.op.secondary_ip = secondary_ip
1456 node_list = cfg.GetNodeList()
1457 if node in node_list:
1458 raise errors.OpPrereqError("Node %s is already in the configuration"
1461 for existing_node_name in node_list:
1462 existing_node = cfg.GetNodeInfo(existing_node_name)
1463 if (existing_node.primary_ip == primary_ip or
1464 existing_node.secondary_ip == primary_ip or
1465 existing_node.primary_ip == secondary_ip or
1466 existing_node.secondary_ip == secondary_ip):
1467 raise errors.OpPrereqError("New node ip address(es) conflict with"
1468 " existing node %s" % existing_node.name)
1470 # check that the type of the node (single versus dual homed) is the
1471 # same as for the master
1472 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1473 master_singlehomed = myself.secondary_ip == myself.primary_ip
1474 newbie_singlehomed = secondary_ip == primary_ip
1475 if master_singlehomed != newbie_singlehomed:
1476 if master_singlehomed:
1477 raise errors.OpPrereqError("The master has no private ip but the"
1478 " new node has one")
1480 raise errors.OpPrereqError("The master has a private ip but the"
1481 " new node doesn't have one")
1483 # checks reachablity
1484 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1485 raise errors.OpPrereqError("Node not reachable by ping")
1487 if not newbie_singlehomed:
1488 # check reachability from my secondary ip to newbie's secondary ip
1489 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1490 source=myself.secondary_ip):
1491 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1492 " based ping to noded port")
1494 self.new_node = objects.Node(name=node,
1495 primary_ip=primary_ip,
1496 secondary_ip=secondary_ip)
1498 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1499 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1500 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1501 constants.VNC_PASSWORD_FILE)
1503 def Exec(self, feedback_fn):
1504 """Adds the new node to the cluster.
1507 new_node = self.new_node
1508 node = new_node.name
1510 # set up inter-node password and certificate and restarts the node daemon
1511 gntpass = self.sstore.GetNodeDaemonPassword()
1512 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1513 raise errors.OpExecError("ganeti password corruption detected")
1514 f = open(constants.SSL_CERT_FILE)
1516 gntpem = f.read(8192)
1519 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1520 # so we use this to detect an invalid certificate; as long as the
1521 # cert doesn't contain this, the here-document will be correctly
1522 # parsed by the shell sequence below
1523 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1524 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1525 if not gntpem.endswith("\n"):
1526 raise errors.OpExecError("PEM must end with newline")
1527 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1529 # and then connect with ssh to set password and start ganeti-noded
1530 # note that all the below variables are sanitized at this point,
1531 # either by being constants or by the checks above
1533 mycommand = ("umask 077 && "
1534 "echo '%s' > '%s' && "
1535 "cat > '%s' << '!EOF.' && \n"
1536 "%s!EOF.\n%s restart" %
1537 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1538 constants.SSL_CERT_FILE, gntpem,
1539 constants.NODE_INITD_SCRIPT))
1541 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1543 raise errors.OpExecError("Remote command on node %s, error: %s,"
1545 (node, result.fail_reason, result.output))
1547 # check connectivity
1550 result = rpc.call_version([node])[node]
1552 if constants.PROTOCOL_VERSION == result:
1553 logger.Info("communication to node %s fine, sw version %s match" %
1556 raise errors.OpExecError("Version mismatch master version %s,"
1557 " node version %s" %
1558 (constants.PROTOCOL_VERSION, result))
1560 raise errors.OpExecError("Cannot get version from the new node")
1563 logger.Info("copy ssh key to node %s" % node)
1564 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1566 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1567 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1573 keyarray.append(f.read())
1577 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1578 keyarray[3], keyarray[4], keyarray[5])
1581 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1583 # Add node to our /etc/hosts, and add key to known_hosts
1584 _AddHostToEtcHosts(new_node.name)
1586 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1587 self.cfg.GetHostKey())
1589 if new_node.secondary_ip != new_node.primary_ip:
1590 if not rpc.call_node_tcp_ping(new_node.name,
1591 constants.LOCALHOST_IP_ADDRESS,
1592 new_node.secondary_ip,
1593 constants.DEFAULT_NODED_PORT,
1595 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1596 " you gave (%s). Please fix and re-run this"
1597 " command." % new_node.secondary_ip)
1599 success, msg = ssh.VerifyNodeHostname(node)
1601 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1602 " than the one the resolver gives: %s."
1603 " Please fix and re-run this command." %
1606 # Distribute updated /etc/hosts and known_hosts to all nodes,
1607 # including the node just added
1608 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1609 dist_nodes = self.cfg.GetNodeList() + [node]
1610 if myself.name in dist_nodes:
1611 dist_nodes.remove(myself.name)
1613 logger.Debug("Copying hosts and known_hosts to all nodes")
1614 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1615 result = rpc.call_upload_file(dist_nodes, fname)
1616 for to_node in dist_nodes:
1617 if not result[to_node]:
1618 logger.Error("copy of file %s to node %s failed" %
1621 to_copy = ss.GetFileList()
1622 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1623 to_copy.append(constants.VNC_PASSWORD_FILE)
1624 for fname in to_copy:
1625 if not ssh.CopyFileToNode(node, fname):
1626 logger.Error("could not copy file %s to node %s" % (fname, node))
1628 logger.Info("adding node %s to cluster.conf" % node)
1629 self.cfg.AddNode(new_node)
1632 class LUMasterFailover(LogicalUnit):
1633 """Failover the master node to the current node.
1635 This is a special LU in that it must run on a non-master node.
1638 HPATH = "master-failover"
1639 HTYPE = constants.HTYPE_CLUSTER
1643 def BuildHooksEnv(self):
1646 This will run on the new master only in the pre phase, and on all
1647 the nodes in the post phase.
1651 "OP_TARGET": self.new_master,
1652 "NEW_MASTER": self.new_master,
1653 "OLD_MASTER": self.old_master,
1655 return env, [self.new_master], self.cfg.GetNodeList()
1657 def CheckPrereq(self):
1658 """Check prerequisites.
1660 This checks that we are not already the master.
1663 self.new_master = utils.HostInfo().name
1664 self.old_master = self.sstore.GetMasterNode()
1666 if self.old_master == self.new_master:
1667 raise errors.OpPrereqError("This commands must be run on the node"
1668 " where you want the new master to be."
1669 " %s is already the master" %
1672 def Exec(self, feedback_fn):
1673 """Failover the master node.
1675 This command, when run on a non-master node, will cause the current
1676 master to cease being master, and the non-master to become new
1680 #TODO: do not rely on gethostname returning the FQDN
1681 logger.Info("setting master to %s, old master: %s" %
1682 (self.new_master, self.old_master))
1684 if not rpc.call_node_stop_master(self.old_master):
1685 logger.Error("could disable the master role on the old master"
1686 " %s, please disable manually" % self.old_master)
1689 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1690 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1691 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1692 logger.Error("could not distribute the new simple store master file"
1693 " to the other nodes, please check.")
1695 if not rpc.call_node_start_master(self.new_master):
1696 logger.Error("could not start the master role on the new master"
1697 " %s, please check" % self.new_master)
1698 feedback_fn("Error in activating the master IP on the new master,"
1699 " please fix manually.")
1703 class LUQueryClusterInfo(NoHooksLU):
1704 """Query cluster configuration.
1710 def CheckPrereq(self):
1711 """No prerequsites needed for this LU.
1716 def Exec(self, feedback_fn):
1717 """Return cluster config.
1721 "name": self.sstore.GetClusterName(),
1722 "software_version": constants.RELEASE_VERSION,
1723 "protocol_version": constants.PROTOCOL_VERSION,
1724 "config_version": constants.CONFIG_VERSION,
1725 "os_api_version": constants.OS_API_VERSION,
1726 "export_version": constants.EXPORT_VERSION,
1727 "master": self.sstore.GetMasterNode(),
1728 "architecture": (platform.architecture()[0], platform.machine()),
1734 class LUClusterCopyFile(NoHooksLU):
1735 """Copy file to cluster.
1738 _OP_REQP = ["nodes", "filename"]
1740 def CheckPrereq(self):
1741 """Check prerequisites.
1743 It should check that the named file exists and that the given list
1747 if not os.path.exists(self.op.filename):
1748 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1750 self.nodes = _GetWantedNodes(self, self.op.nodes)
1752 def Exec(self, feedback_fn):
1753 """Copy a file from master to some nodes.
1756 opts - class with options as members
1757 args - list containing a single element, the file name
1759 nodes - list containing the name of target nodes; if empty, all nodes
1762 filename = self.op.filename
1764 myname = utils.HostInfo().name
1766 for node in self.nodes:
1769 if not ssh.CopyFileToNode(node, filename):
1770 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1773 class LUDumpClusterConfig(NoHooksLU):
1774 """Return a text-representation of the cluster-config.
1779 def CheckPrereq(self):
1780 """No prerequisites.
1785 def Exec(self, feedback_fn):
1786 """Dump a representation of the cluster config to the standard output.
1789 return self.cfg.DumpConfig()
1792 class LURunClusterCommand(NoHooksLU):
1793 """Run a command on some nodes.
1796 _OP_REQP = ["command", "nodes"]
1798 def CheckPrereq(self):
1799 """Check prerequisites.
1801 It checks that the given list of nodes is valid.
1804 self.nodes = _GetWantedNodes(self, self.op.nodes)
1806 def Exec(self, feedback_fn):
1807 """Run a command on some nodes.
1810 # put the master at the end of the nodes list
1811 master_node = self.sstore.GetMasterNode()
1812 if master_node in self.nodes:
1813 self.nodes.remove(master_node)
1814 self.nodes.append(master_node)
1817 for node in self.nodes:
1818 result = ssh.SSHCall(node, "root", self.op.command)
1819 data.append((node, result.output, result.exit_code))
1824 class LUActivateInstanceDisks(NoHooksLU):
1825 """Bring up an instance's disks.
1828 _OP_REQP = ["instance_name"]
1830 def CheckPrereq(self):
1831 """Check prerequisites.
1833 This checks that the instance is in the cluster.
1836 instance = self.cfg.GetInstanceInfo(
1837 self.cfg.ExpandInstanceName(self.op.instance_name))
1838 if instance is None:
1839 raise errors.OpPrereqError("Instance '%s' not known" %
1840 self.op.instance_name)
1841 self.instance = instance
1844 def Exec(self, feedback_fn):
1845 """Activate the disks.
1848 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1850 raise errors.OpExecError("Cannot activate block devices")
1855 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1856 """Prepare the block devices for an instance.
1858 This sets up the block devices on all nodes.
1861 instance: a ganeti.objects.Instance object
1862 ignore_secondaries: if true, errors on secondary nodes won't result
1863 in an error return from the function
1866 false if the operation failed
1867 list of (host, instance_visible_name, node_visible_name) if the operation
1868 suceeded with the mapping from node devices to instance devices
1872 iname = instance.name
1873 # With the two passes mechanism we try to reduce the window of
1874 # opportunity for the race condition of switching DRBD to primary
1875 # before handshaking occured, but we do not eliminate it
1877 # The proper fix would be to wait (with some limits) until the
1878 # connection has been made and drbd transitions from WFConnection
1879 # into any other network-connected state (Connected, SyncTarget,
1882 # 1st pass, assemble on all nodes in secondary mode
1883 for inst_disk in instance.disks:
1884 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1885 cfg.SetDiskID(node_disk, node)
1886 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1888 logger.Error("could not prepare block device %s on node %s"
1889 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1890 if not ignore_secondaries:
1893 # FIXME: race condition on drbd migration to primary
1895 # 2nd pass, do only the primary node
1896 for inst_disk in instance.disks:
1897 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1898 if node != instance.primary_node:
1900 cfg.SetDiskID(node_disk, node)
1901 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1903 logger.Error("could not prepare block device %s on node %s"
1904 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1906 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1908 # leave the disks configured for the primary node
1909 # this is a workaround that would be fixed better by
1910 # improving the logical/physical id handling
1911 for disk in instance.disks:
1912 cfg.SetDiskID(disk, instance.primary_node)
1914 return disks_ok, device_info
1917 def _StartInstanceDisks(cfg, instance, force):
1918 """Start the disks of an instance.
1921 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1922 ignore_secondaries=force)
1924 _ShutdownInstanceDisks(instance, cfg)
1925 if force is not None and not force:
1926 logger.Error("If the message above refers to a secondary node,"
1927 " you can retry the operation using '--force'.")
1928 raise errors.OpExecError("Disk consistency error")
1931 class LUDeactivateInstanceDisks(NoHooksLU):
1932 """Shutdown an instance's disks.
1935 _OP_REQP = ["instance_name"]
1937 def CheckPrereq(self):
1938 """Check prerequisites.
1940 This checks that the instance is in the cluster.
1943 instance = self.cfg.GetInstanceInfo(
1944 self.cfg.ExpandInstanceName(self.op.instance_name))
1945 if instance is None:
1946 raise errors.OpPrereqError("Instance '%s' not known" %
1947 self.op.instance_name)
1948 self.instance = instance
1950 def Exec(self, feedback_fn):
1951 """Deactivate the disks
1954 instance = self.instance
1955 ins_l = rpc.call_instance_list([instance.primary_node])
1956 ins_l = ins_l[instance.primary_node]
1957 if not type(ins_l) is list:
1958 raise errors.OpExecError("Can't contact node '%s'" %
1959 instance.primary_node)
1961 if self.instance.name in ins_l:
1962 raise errors.OpExecError("Instance is running, can't shutdown"
1965 _ShutdownInstanceDisks(instance, self.cfg)
1968 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1969 """Shutdown block devices of an instance.
1971 This does the shutdown on all nodes of the instance.
1973 If the ignore_primary is false, errors on the primary node are
1978 for disk in instance.disks:
1979 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1980 cfg.SetDiskID(top_disk, node)
1981 if not rpc.call_blockdev_shutdown(node, top_disk):
1982 logger.Error("could not shutdown block device %s on node %s" %
1983 (disk.iv_name, node))
1984 if not ignore_primary or node != instance.primary_node:
1989 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1990 """Checks if a node has enough free memory.
1992 This function check if a given node has the needed amount of free
1993 memory. In case the node has less memory or we cannot get the
1994 information from the node, this function raise an OpPrereqError
1998 - cfg: a ConfigWriter instance
1999 - node: the node name
2000 - reason: string to use in the error message
2001 - requested: the amount of memory in MiB
2004 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2005 if not nodeinfo or not isinstance(nodeinfo, dict):
2006 raise errors.OpPrereqError("Could not contact node %s for resource"
2007 " information" % (node,))
2009 free_mem = nodeinfo[node].get('memory_free')
2010 if not isinstance(free_mem, int):
2011 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2012 " was '%s'" % (node, free_mem))
2013 if requested > free_mem:
2014 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2015 " needed %s MiB, available %s MiB" %
2016 (node, reason, requested, free_mem))
2019 class LUStartupInstance(LogicalUnit):
2020 """Starts an instance.
2023 HPATH = "instance-start"
2024 HTYPE = constants.HTYPE_INSTANCE
2025 _OP_REQP = ["instance_name", "force"]
2027 def BuildHooksEnv(self):
2030 This runs on master, primary and secondary nodes of the instance.
2034 "FORCE": self.op.force,
2036 env.update(_BuildInstanceHookEnvByObject(self.instance))
2037 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2038 list(self.instance.secondary_nodes))
2041 def CheckPrereq(self):
2042 """Check prerequisites.
2044 This checks that the instance is in the cluster.
2047 instance = self.cfg.GetInstanceInfo(
2048 self.cfg.ExpandInstanceName(self.op.instance_name))
2049 if instance is None:
2050 raise errors.OpPrereqError("Instance '%s' not known" %
2051 self.op.instance_name)
2053 # check bridges existance
2054 _CheckInstanceBridgesExist(instance)
2056 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2057 "starting instance %s" % instance.name,
2060 self.instance = instance
2061 self.op.instance_name = instance.name
2063 def Exec(self, feedback_fn):
2064 """Start the instance.
2067 instance = self.instance
2068 force = self.op.force
2069 extra_args = getattr(self.op, "extra_args", "")
2071 self.cfg.MarkInstanceUp(instance.name)
2073 node_current = instance.primary_node
2075 _StartInstanceDisks(self.cfg, instance, force)
2077 if not rpc.call_instance_start(node_current, instance, extra_args):
2078 _ShutdownInstanceDisks(instance, self.cfg)
2079 raise errors.OpExecError("Could not start instance")
2082 class LURebootInstance(LogicalUnit):
2083 """Reboot an instance.
2086 HPATH = "instance-reboot"
2087 HTYPE = constants.HTYPE_INSTANCE
2088 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2090 def BuildHooksEnv(self):
2093 This runs on master, primary and secondary nodes of the instance.
2097 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2099 env.update(_BuildInstanceHookEnvByObject(self.instance))
2100 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2101 list(self.instance.secondary_nodes))
2104 def CheckPrereq(self):
2105 """Check prerequisites.
2107 This checks that the instance is in the cluster.
2110 instance = self.cfg.GetInstanceInfo(
2111 self.cfg.ExpandInstanceName(self.op.instance_name))
2112 if instance is None:
2113 raise errors.OpPrereqError("Instance '%s' not known" %
2114 self.op.instance_name)
2116 # check bridges existance
2117 _CheckInstanceBridgesExist(instance)
2119 self.instance = instance
2120 self.op.instance_name = instance.name
2122 def Exec(self, feedback_fn):
2123 """Reboot the instance.
2126 instance = self.instance
2127 ignore_secondaries = self.op.ignore_secondaries
2128 reboot_type = self.op.reboot_type
2129 extra_args = getattr(self.op, "extra_args", "")
2131 node_current = instance.primary_node
2133 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2134 constants.INSTANCE_REBOOT_HARD,
2135 constants.INSTANCE_REBOOT_FULL]:
2136 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2137 (constants.INSTANCE_REBOOT_SOFT,
2138 constants.INSTANCE_REBOOT_HARD,
2139 constants.INSTANCE_REBOOT_FULL))
2141 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2142 constants.INSTANCE_REBOOT_HARD]:
2143 if not rpc.call_instance_reboot(node_current, instance,
2144 reboot_type, extra_args):
2145 raise errors.OpExecError("Could not reboot instance")
2147 if not rpc.call_instance_shutdown(node_current, instance):
2148 raise errors.OpExecError("could not shutdown instance for full reboot")
2149 _ShutdownInstanceDisks(instance, self.cfg)
2150 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2151 if not rpc.call_instance_start(node_current, instance, extra_args):
2152 _ShutdownInstanceDisks(instance, self.cfg)
2153 raise errors.OpExecError("Could not start instance for full reboot")
2155 self.cfg.MarkInstanceUp(instance.name)
2158 class LUShutdownInstance(LogicalUnit):
2159 """Shutdown an instance.
2162 HPATH = "instance-stop"
2163 HTYPE = constants.HTYPE_INSTANCE
2164 _OP_REQP = ["instance_name"]
2166 def BuildHooksEnv(self):
2169 This runs on master, primary and secondary nodes of the instance.
2172 env = _BuildInstanceHookEnvByObject(self.instance)
2173 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2174 list(self.instance.secondary_nodes))
2177 def CheckPrereq(self):
2178 """Check prerequisites.
2180 This checks that the instance is in the cluster.
2183 instance = self.cfg.GetInstanceInfo(
2184 self.cfg.ExpandInstanceName(self.op.instance_name))
2185 if instance is None:
2186 raise errors.OpPrereqError("Instance '%s' not known" %
2187 self.op.instance_name)
2188 self.instance = instance
2190 def Exec(self, feedback_fn):
2191 """Shutdown the instance.
2194 instance = self.instance
2195 node_current = instance.primary_node
2196 self.cfg.MarkInstanceDown(instance.name)
2197 if not rpc.call_instance_shutdown(node_current, instance):
2198 logger.Error("could not shutdown instance")
2200 _ShutdownInstanceDisks(instance, self.cfg)
2203 class LUReinstallInstance(LogicalUnit):
2204 """Reinstall an instance.
2207 HPATH = "instance-reinstall"
2208 HTYPE = constants.HTYPE_INSTANCE
2209 _OP_REQP = ["instance_name"]
2211 def BuildHooksEnv(self):
2214 This runs on master, primary and secondary nodes of the instance.
2217 env = _BuildInstanceHookEnvByObject(self.instance)
2218 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2219 list(self.instance.secondary_nodes))
2222 def CheckPrereq(self):
2223 """Check prerequisites.
2225 This checks that the instance is in the cluster and is not running.
2228 instance = self.cfg.GetInstanceInfo(
2229 self.cfg.ExpandInstanceName(self.op.instance_name))
2230 if instance is None:
2231 raise errors.OpPrereqError("Instance '%s' not known" %
2232 self.op.instance_name)
2233 if instance.disk_template == constants.DT_DISKLESS:
2234 raise errors.OpPrereqError("Instance '%s' has no disks" %
2235 self.op.instance_name)
2236 if instance.status != "down":
2237 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2238 self.op.instance_name)
2239 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2241 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2242 (self.op.instance_name,
2243 instance.primary_node))
2245 self.op.os_type = getattr(self.op, "os_type", None)
2246 if self.op.os_type is not None:
2248 pnode = self.cfg.GetNodeInfo(
2249 self.cfg.ExpandNodeName(instance.primary_node))
2251 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2253 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2255 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2256 " primary node" % self.op.os_type)
2258 self.instance = instance
2260 def Exec(self, feedback_fn):
2261 """Reinstall the instance.
2264 inst = self.instance
2266 if self.op.os_type is not None:
2267 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2268 inst.os = self.op.os_type
2269 self.cfg.AddInstance(inst)
2271 _StartInstanceDisks(self.cfg, inst, None)
2273 feedback_fn("Running the instance OS create scripts...")
2274 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2275 raise errors.OpExecError("Could not install OS for instance %s"
2277 (inst.name, inst.primary_node))
2279 _ShutdownInstanceDisks(inst, self.cfg)
2282 class LURenameInstance(LogicalUnit):
2283 """Rename an instance.
2286 HPATH = "instance-rename"
2287 HTYPE = constants.HTYPE_INSTANCE
2288 _OP_REQP = ["instance_name", "new_name"]
2290 def BuildHooksEnv(self):
2293 This runs on master, primary and secondary nodes of the instance.
2296 env = _BuildInstanceHookEnvByObject(self.instance)
2297 env["INSTANCE_NEW_NAME"] = self.op.new_name
2298 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2299 list(self.instance.secondary_nodes))
2302 def CheckPrereq(self):
2303 """Check prerequisites.
2305 This checks that the instance is in the cluster and is not running.
2308 instance = self.cfg.GetInstanceInfo(
2309 self.cfg.ExpandInstanceName(self.op.instance_name))
2310 if instance is None:
2311 raise errors.OpPrereqError("Instance '%s' not known" %
2312 self.op.instance_name)
2313 if instance.status != "down":
2314 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2315 self.op.instance_name)
2316 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2318 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2319 (self.op.instance_name,
2320 instance.primary_node))
2321 self.instance = instance
2323 # new name verification
2324 name_info = utils.HostInfo(self.op.new_name)
2326 self.op.new_name = new_name = name_info.name
2327 instance_list = self.cfg.GetInstanceList()
2328 if new_name in instance_list:
2329 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2332 if not getattr(self.op, "ignore_ip", False):
2333 command = ["fping", "-q", name_info.ip]
2334 result = utils.RunCmd(command)
2335 if not result.failed:
2336 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2337 (name_info.ip, new_name))
2340 def Exec(self, feedback_fn):
2341 """Reinstall the instance.
2344 inst = self.instance
2345 old_name = inst.name
2347 self.cfg.RenameInstance(inst.name, self.op.new_name)
2349 # re-read the instance from the configuration after rename
2350 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2352 _StartInstanceDisks(self.cfg, inst, None)
2354 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2356 msg = ("Could run OS rename script for instance %s on node %s (but the"
2357 " instance has been renamed in Ganeti)" %
2358 (inst.name, inst.primary_node))
2361 _ShutdownInstanceDisks(inst, self.cfg)
2364 class LURemoveInstance(LogicalUnit):
2365 """Remove an instance.
2368 HPATH = "instance-remove"
2369 HTYPE = constants.HTYPE_INSTANCE
2370 _OP_REQP = ["instance_name"]
2372 def BuildHooksEnv(self):
2375 This runs on master, primary and secondary nodes of the instance.
2378 env = _BuildInstanceHookEnvByObject(self.instance)
2379 nl = [self.sstore.GetMasterNode()]
2382 def CheckPrereq(self):
2383 """Check prerequisites.
2385 This checks that the instance is in the cluster.
2388 instance = self.cfg.GetInstanceInfo(
2389 self.cfg.ExpandInstanceName(self.op.instance_name))
2390 if instance is None:
2391 raise errors.OpPrereqError("Instance '%s' not known" %
2392 self.op.instance_name)
2393 self.instance = instance
2395 def Exec(self, feedback_fn):
2396 """Remove the instance.
2399 instance = self.instance
2400 logger.Info("shutting down instance %s on node %s" %
2401 (instance.name, instance.primary_node))
2403 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2404 if self.op.ignore_failures:
2405 feedback_fn("Warning: can't shutdown instance")
2407 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2408 (instance.name, instance.primary_node))
2410 logger.Info("removing block devices for instance %s" % instance.name)
2412 if not _RemoveDisks(instance, self.cfg):
2413 if self.op.ignore_failures:
2414 feedback_fn("Warning: can't remove instance's disks")
2416 raise errors.OpExecError("Can't remove instance's disks")
2418 logger.Info("removing instance %s out of cluster config" % instance.name)
2420 self.cfg.RemoveInstance(instance.name)
2423 class LUQueryInstances(NoHooksLU):
2424 """Logical unit for querying instances.
2427 _OP_REQP = ["output_fields", "names"]
2429 def CheckPrereq(self):
2430 """Check prerequisites.
2432 This checks that the fields required are valid output fields.
2435 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2436 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2437 "admin_state", "admin_ram",
2438 "disk_template", "ip", "mac", "bridge",
2439 "sda_size", "sdb_size", "vcpus"],
2440 dynamic=self.dynamic_fields,
2441 selected=self.op.output_fields)
2443 self.wanted = _GetWantedInstances(self, self.op.names)
2445 def Exec(self, feedback_fn):
2446 """Computes the list of nodes and their attributes.
2449 instance_names = self.wanted
2450 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2453 # begin data gathering
2455 nodes = frozenset([inst.primary_node for inst in instance_list])
2458 if self.dynamic_fields.intersection(self.op.output_fields):
2460 node_data = rpc.call_all_instances_info(nodes)
2462 result = node_data[name]
2464 live_data.update(result)
2465 elif result == False:
2466 bad_nodes.append(name)
2467 # else no instance is alive
2469 live_data = dict([(name, {}) for name in instance_names])
2471 # end data gathering
2474 for instance in instance_list:
2476 for field in self.op.output_fields:
2481 elif field == "pnode":
2482 val = instance.primary_node
2483 elif field == "snodes":
2484 val = list(instance.secondary_nodes)
2485 elif field == "admin_state":
2486 val = (instance.status != "down")
2487 elif field == "oper_state":
2488 if instance.primary_node in bad_nodes:
2491 val = bool(live_data.get(instance.name))
2492 elif field == "status":
2493 if instance.primary_node in bad_nodes:
2494 val = "ERROR_nodedown"
2496 running = bool(live_data.get(instance.name))
2498 if instance.status != "down":
2503 if instance.status != "down":
2507 elif field == "admin_ram":
2508 val = instance.memory
2509 elif field == "oper_ram":
2510 if instance.primary_node in bad_nodes:
2512 elif instance.name in live_data:
2513 val = live_data[instance.name].get("memory", "?")
2516 elif field == "disk_template":
2517 val = instance.disk_template
2519 val = instance.nics[0].ip
2520 elif field == "bridge":
2521 val = instance.nics[0].bridge
2522 elif field == "mac":
2523 val = instance.nics[0].mac
2524 elif field == "sda_size" or field == "sdb_size":
2525 disk = instance.FindDisk(field[:3])
2530 elif field == "vcpus":
2531 val = instance.vcpus
2533 raise errors.ParameterError(field)
2540 class LUFailoverInstance(LogicalUnit):
2541 """Failover an instance.
2544 HPATH = "instance-failover"
2545 HTYPE = constants.HTYPE_INSTANCE
2546 _OP_REQP = ["instance_name", "ignore_consistency"]
2548 def BuildHooksEnv(self):
2551 This runs on master, primary and secondary nodes of the instance.
2555 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2557 env.update(_BuildInstanceHookEnvByObject(self.instance))
2558 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2561 def CheckPrereq(self):
2562 """Check prerequisites.
2564 This checks that the instance is in the cluster.
2567 instance = self.cfg.GetInstanceInfo(
2568 self.cfg.ExpandInstanceName(self.op.instance_name))
2569 if instance is None:
2570 raise errors.OpPrereqError("Instance '%s' not known" %
2571 self.op.instance_name)
2573 if instance.disk_template not in constants.DTS_NET_MIRROR:
2574 raise errors.OpPrereqError("Instance's disk layout is not"
2575 " network mirrored, cannot failover.")
2577 secondary_nodes = instance.secondary_nodes
2578 if not secondary_nodes:
2579 raise errors.ProgrammerError("no secondary node but using "
2580 "DT_REMOTE_RAID1 template")
2582 target_node = secondary_nodes[0]
2583 # check memory requirements on the secondary node
2584 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2585 instance.name, instance.memory)
2587 # check bridge existance
2588 brlist = [nic.bridge for nic in instance.nics]
2589 if not rpc.call_bridges_exist(target_node, brlist):
2590 raise errors.OpPrereqError("One or more target bridges %s does not"
2591 " exist on destination node '%s'" %
2592 (brlist, target_node))
2594 self.instance = instance
2596 def Exec(self, feedback_fn):
2597 """Failover an instance.
2599 The failover is done by shutting it down on its present node and
2600 starting it on the secondary.
2603 instance = self.instance
2605 source_node = instance.primary_node
2606 target_node = instance.secondary_nodes[0]
2608 feedback_fn("* checking disk consistency between source and target")
2609 for dev in instance.disks:
2610 # for remote_raid1, these are md over drbd
2611 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2612 if instance.status == "up" and not self.op.ignore_consistency:
2613 raise errors.OpExecError("Disk %s is degraded on target node,"
2614 " aborting failover." % dev.iv_name)
2616 feedback_fn("* shutting down instance on source node")
2617 logger.Info("Shutting down instance %s on node %s" %
2618 (instance.name, source_node))
2620 if not rpc.call_instance_shutdown(source_node, instance):
2621 if self.op.ignore_consistency:
2622 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2623 " anyway. Please make sure node %s is down" %
2624 (instance.name, source_node, source_node))
2626 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2627 (instance.name, source_node))
2629 feedback_fn("* deactivating the instance's disks on source node")
2630 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2631 raise errors.OpExecError("Can't shut down the instance's disks.")
2633 instance.primary_node = target_node
2634 # distribute new instance config to the other nodes
2635 self.cfg.AddInstance(instance)
2637 # Only start the instance if it's marked as up
2638 if instance.status == "up":
2639 feedback_fn("* activating the instance's disks on target node")
2640 logger.Info("Starting instance %s on node %s" %
2641 (instance.name, target_node))
2643 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2644 ignore_secondaries=True)
2646 _ShutdownInstanceDisks(instance, self.cfg)
2647 raise errors.OpExecError("Can't activate the instance's disks")
2649 feedback_fn("* starting the instance on the target node")
2650 if not rpc.call_instance_start(target_node, instance, None):
2651 _ShutdownInstanceDisks(instance, self.cfg)
2652 raise errors.OpExecError("Could not start instance %s on node %s." %
2653 (instance.name, target_node))
2656 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2657 """Create a tree of block devices on the primary node.
2659 This always creates all devices.
2663 for child in device.children:
2664 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2667 cfg.SetDiskID(device, node)
2668 new_id = rpc.call_blockdev_create(node, device, device.size,
2669 instance.name, True, info)
2672 if device.physical_id is None:
2673 device.physical_id = new_id
2677 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2678 """Create a tree of block devices on a secondary node.
2680 If this device type has to be created on secondaries, create it and
2683 If not, just recurse to children keeping the same 'force' value.
2686 if device.CreateOnSecondary():
2689 for child in device.children:
2690 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2691 child, force, info):
2696 cfg.SetDiskID(device, node)
2697 new_id = rpc.call_blockdev_create(node, device, device.size,
2698 instance.name, False, info)
2701 if device.physical_id is None:
2702 device.physical_id = new_id
2706 def _GenerateUniqueNames(cfg, exts):
2707 """Generate a suitable LV name.
2709 This will generate a logical volume name for the given instance.
2714 new_id = cfg.GenerateUniqueID()
2715 results.append("%s%s" % (new_id, val))
2719 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2720 """Generate a drbd device complete with its children.
2723 port = cfg.AllocatePort()
2724 vgname = cfg.GetVGName()
2725 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2726 logical_id=(vgname, names[0]))
2727 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2728 logical_id=(vgname, names[1]))
2729 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2730 logical_id = (primary, secondary, port),
2731 children = [dev_data, dev_meta])
2735 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2736 """Generate a drbd8 device complete with its children.
2739 port = cfg.AllocatePort()
2740 vgname = cfg.GetVGName()
2741 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2742 logical_id=(vgname, names[0]))
2743 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2744 logical_id=(vgname, names[1]))
2745 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2746 logical_id = (primary, secondary, port),
2747 children = [dev_data, dev_meta],
2751 def _GenerateDiskTemplate(cfg, template_name,
2752 instance_name, primary_node,
2753 secondary_nodes, disk_sz, swap_sz):
2754 """Generate the entire disk layout for a given template type.
2757 #TODO: compute space requirements
2759 vgname = cfg.GetVGName()
2760 if template_name == constants.DT_DISKLESS:
2762 elif template_name == constants.DT_PLAIN:
2763 if len(secondary_nodes) != 0:
2764 raise errors.ProgrammerError("Wrong template configuration")
2766 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2767 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2768 logical_id=(vgname, names[0]),
2770 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2771 logical_id=(vgname, names[1]),
2773 disks = [sda_dev, sdb_dev]
2774 elif template_name == constants.DT_LOCAL_RAID1:
2775 if len(secondary_nodes) != 0:
2776 raise errors.ProgrammerError("Wrong template configuration")
2779 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2780 ".sdb_m1", ".sdb_m2"])
2781 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2782 logical_id=(vgname, names[0]))
2783 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2784 logical_id=(vgname, names[1]))
2785 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2787 children = [sda_dev_m1, sda_dev_m2])
2788 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2789 logical_id=(vgname, names[2]))
2790 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2791 logical_id=(vgname, names[3]))
2792 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2794 children = [sdb_dev_m1, sdb_dev_m2])
2795 disks = [md_sda_dev, md_sdb_dev]
2796 elif template_name == constants.DT_REMOTE_RAID1:
2797 if len(secondary_nodes) != 1:
2798 raise errors.ProgrammerError("Wrong template configuration")
2799 remote_node = secondary_nodes[0]
2800 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2801 ".sdb_data", ".sdb_meta"])
2802 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2803 disk_sz, names[0:2])
2804 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2805 children = [drbd_sda_dev], size=disk_sz)
2806 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2807 swap_sz, names[2:4])
2808 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2809 children = [drbd_sdb_dev], size=swap_sz)
2810 disks = [md_sda_dev, md_sdb_dev]
2811 elif template_name == constants.DT_DRBD8:
2812 if len(secondary_nodes) != 1:
2813 raise errors.ProgrammerError("Wrong template configuration")
2814 remote_node = secondary_nodes[0]
2815 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2816 ".sdb_data", ".sdb_meta"])
2817 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2818 disk_sz, names[0:2], "sda")
2819 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2820 swap_sz, names[2:4], "sdb")
2821 disks = [drbd_sda_dev, drbd_sdb_dev]
2823 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2827 def _GetInstanceInfoText(instance):
2828 """Compute that text that should be added to the disk's metadata.
2831 return "originstname+%s" % instance.name
2834 def _CreateDisks(cfg, instance):
2835 """Create all disks for an instance.
2837 This abstracts away some work from AddInstance.
2840 instance: the instance object
2843 True or False showing the success of the creation process
2846 info = _GetInstanceInfoText(instance)
2848 for device in instance.disks:
2849 logger.Info("creating volume %s for instance %s" %
2850 (device.iv_name, instance.name))
2852 for secondary_node in instance.secondary_nodes:
2853 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2854 device, False, info):
2855 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2856 (device.iv_name, device, secondary_node))
2859 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2860 instance, device, info):
2861 logger.Error("failed to create volume %s on primary!" %
2867 def _RemoveDisks(instance, cfg):
2868 """Remove all disks for an instance.
2870 This abstracts away some work from `AddInstance()` and
2871 `RemoveInstance()`. Note that in case some of the devices couldn't
2872 be removed, the removal will continue with the other ones (compare
2873 with `_CreateDisks()`).
2876 instance: the instance object
2879 True or False showing the success of the removal proces
2882 logger.Info("removing block devices for instance %s" % instance.name)
2885 for device in instance.disks:
2886 for node, disk in device.ComputeNodeTree(instance.primary_node):
2887 cfg.SetDiskID(disk, node)
2888 if not rpc.call_blockdev_remove(node, disk):
2889 logger.Error("could not remove block device %s on node %s,"
2890 " continuing anyway" %
2891 (device.iv_name, node))
2896 class LUCreateInstance(LogicalUnit):
2897 """Create an instance.
2900 HPATH = "instance-add"
2901 HTYPE = constants.HTYPE_INSTANCE
2902 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2903 "disk_template", "swap_size", "mode", "start", "vcpus",
2904 "wait_for_sync", "ip_check", "mac"]
2906 def BuildHooksEnv(self):
2909 This runs on master, primary and secondary nodes of the instance.
2913 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2914 "INSTANCE_DISK_SIZE": self.op.disk_size,
2915 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2916 "INSTANCE_ADD_MODE": self.op.mode,
2918 if self.op.mode == constants.INSTANCE_IMPORT:
2919 env["INSTANCE_SRC_NODE"] = self.op.src_node
2920 env["INSTANCE_SRC_PATH"] = self.op.src_path
2921 env["INSTANCE_SRC_IMAGE"] = self.src_image
2923 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2924 primary_node=self.op.pnode,
2925 secondary_nodes=self.secondaries,
2926 status=self.instance_status,
2927 os_type=self.op.os_type,
2928 memory=self.op.mem_size,
2929 vcpus=self.op.vcpus,
2930 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2933 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2938 def CheckPrereq(self):
2939 """Check prerequisites.
2942 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2943 if not hasattr(self.op, attr):
2944 setattr(self.op, attr, None)
2946 if self.op.mode not in (constants.INSTANCE_CREATE,
2947 constants.INSTANCE_IMPORT):
2948 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2951 if self.op.mode == constants.INSTANCE_IMPORT:
2952 src_node = getattr(self.op, "src_node", None)
2953 src_path = getattr(self.op, "src_path", None)
2954 if src_node is None or src_path is None:
2955 raise errors.OpPrereqError("Importing an instance requires source"
2956 " node and path options")
2957 src_node_full = self.cfg.ExpandNodeName(src_node)
2958 if src_node_full is None:
2959 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2960 self.op.src_node = src_node = src_node_full
2962 if not os.path.isabs(src_path):
2963 raise errors.OpPrereqError("The source path must be absolute")
2965 export_info = rpc.call_export_info(src_node, src_path)
2968 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2970 if not export_info.has_section(constants.INISECT_EXP):
2971 raise errors.ProgrammerError("Corrupted export config")
2973 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2974 if (int(ei_version) != constants.EXPORT_VERSION):
2975 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2976 (ei_version, constants.EXPORT_VERSION))
2978 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2979 raise errors.OpPrereqError("Can't import instance with more than"
2982 # FIXME: are the old os-es, disk sizes, etc. useful?
2983 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2984 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2986 self.src_image = diskimage
2987 else: # INSTANCE_CREATE
2988 if getattr(self.op, "os_type", None) is None:
2989 raise errors.OpPrereqError("No guest OS specified")
2991 # check primary node
2992 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2994 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2996 self.op.pnode = pnode.name
2998 self.secondaries = []
2999 # disk template and mirror node verification
3000 if self.op.disk_template not in constants.DISK_TEMPLATES:
3001 raise errors.OpPrereqError("Invalid disk template name")
3003 if self.op.disk_template in constants.DTS_NET_MIRROR:
3004 if getattr(self.op, "snode", None) is None:
3005 raise errors.OpPrereqError("The networked disk templates need"
3008 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3009 if snode_name is None:
3010 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3012 elif snode_name == pnode.name:
3013 raise errors.OpPrereqError("The secondary node cannot be"
3014 " the primary node.")
3015 self.secondaries.append(snode_name)
3017 # Required free disk space as a function of disk and swap space
3019 constants.DT_DISKLESS: None,
3020 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3021 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3022 # 256 MB are added for drbd metadata, 128MB for each drbd device
3023 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3024 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3027 if self.op.disk_template not in req_size_dict:
3028 raise errors.ProgrammerError("Disk template '%s' size requirement"
3029 " is unknown" % self.op.disk_template)
3031 req_size = req_size_dict[self.op.disk_template]
3033 # Check lv size requirements
3034 if req_size is not None:
3035 nodenames = [pnode.name] + self.secondaries
3036 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3037 for node in nodenames:
3038 info = nodeinfo.get(node, None)
3040 raise errors.OpPrereqError("Cannot get current information"
3041 " from node '%s'" % nodeinfo)
3042 vg_free = info.get('vg_free', None)
3043 if not isinstance(vg_free, int):
3044 raise errors.OpPrereqError("Can't compute free disk space on"
3046 if req_size > info['vg_free']:
3047 raise errors.OpPrereqError("Not enough disk space on target node %s."
3048 " %d MB available, %d MB required" %
3049 (node, info['vg_free'], req_size))
3052 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3054 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3055 " primary node" % self.op.os_type)
3057 if self.op.kernel_path == constants.VALUE_NONE:
3058 raise errors.OpPrereqError("Can't set instance kernel to none")
3060 # instance verification
3061 hostname1 = utils.HostInfo(self.op.instance_name)
3063 self.op.instance_name = instance_name = hostname1.name
3064 instance_list = self.cfg.GetInstanceList()
3065 if instance_name in instance_list:
3066 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3069 ip = getattr(self.op, "ip", None)
3070 if ip is None or ip.lower() == "none":
3072 elif ip.lower() == "auto":
3073 inst_ip = hostname1.ip
3075 if not utils.IsValidIP(ip):
3076 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3077 " like a valid IP" % ip)
3079 self.inst_ip = inst_ip
3081 if self.op.start and not self.op.ip_check:
3082 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3083 " adding an instance in start mode")
3085 if self.op.ip_check:
3086 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3087 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3088 (hostname1.ip, instance_name))
3090 # MAC address verification
3091 if self.op.mac != "auto":
3092 if not utils.IsValidMac(self.op.mac.lower()):
3093 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3096 # bridge verification
3097 bridge = getattr(self.op, "bridge", None)
3099 self.op.bridge = self.cfg.GetDefBridge()
3101 self.op.bridge = bridge
3103 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3104 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3105 " destination node '%s'" %
3106 (self.op.bridge, pnode.name))
3108 # boot order verification
3109 if self.op.hvm_boot_order is not None:
3110 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3111 raise errors.OpPrereqError("invalid boot order specified,"
3112 " must be one or more of [acdn]")
3115 self.instance_status = 'up'
3117 self.instance_status = 'down'
3119 def Exec(self, feedback_fn):
3120 """Create and add the instance to the cluster.
3123 instance = self.op.instance_name
3124 pnode_name = self.pnode.name
3126 if self.op.mac == "auto":
3127 mac_address = self.cfg.GenerateMAC()
3129 mac_address = self.op.mac
3131 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3132 if self.inst_ip is not None:
3133 nic.ip = self.inst_ip
3135 ht_kind = self.sstore.GetHypervisorType()
3136 if ht_kind in constants.HTS_REQ_PORT:
3137 network_port = self.cfg.AllocatePort()
3141 disks = _GenerateDiskTemplate(self.cfg,
3142 self.op.disk_template,
3143 instance, pnode_name,
3144 self.secondaries, self.op.disk_size,
3147 iobj = objects.Instance(name=instance, os=self.op.os_type,
3148 primary_node=pnode_name,
3149 memory=self.op.mem_size,
3150 vcpus=self.op.vcpus,
3151 nics=[nic], disks=disks,
3152 disk_template=self.op.disk_template,
3153 status=self.instance_status,
3154 network_port=network_port,
3155 kernel_path=self.op.kernel_path,
3156 initrd_path=self.op.initrd_path,
3157 hvm_boot_order=self.op.hvm_boot_order,
3160 feedback_fn("* creating instance disks...")
3161 if not _CreateDisks(self.cfg, iobj):
3162 _RemoveDisks(iobj, self.cfg)
3163 raise errors.OpExecError("Device creation failed, reverting...")
3165 feedback_fn("adding instance %s to cluster config" % instance)
3167 self.cfg.AddInstance(iobj)
3169 if self.op.wait_for_sync:
3170 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3171 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3172 # make sure the disks are not degraded (still sync-ing is ok)
3174 feedback_fn("* checking mirrors status")
3175 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3180 _RemoveDisks(iobj, self.cfg)
3181 self.cfg.RemoveInstance(iobj.name)
3182 raise errors.OpExecError("There are some degraded disks for"
3185 feedback_fn("creating os for instance %s on node %s" %
3186 (instance, pnode_name))
3188 if iobj.disk_template != constants.DT_DISKLESS:
3189 if self.op.mode == constants.INSTANCE_CREATE:
3190 feedback_fn("* running the instance OS create scripts...")
3191 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3192 raise errors.OpExecError("could not add os for instance %s"
3194 (instance, pnode_name))
3196 elif self.op.mode == constants.INSTANCE_IMPORT:
3197 feedback_fn("* running the instance OS import scripts...")
3198 src_node = self.op.src_node
3199 src_image = self.src_image
3200 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3201 src_node, src_image):
3202 raise errors.OpExecError("Could not import os for instance"
3204 (instance, pnode_name))
3206 # also checked in the prereq part
3207 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3211 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3212 feedback_fn("* starting instance...")
3213 if not rpc.call_instance_start(pnode_name, iobj, None):
3214 raise errors.OpExecError("Could not start instance")
3217 class LUConnectConsole(NoHooksLU):
3218 """Connect to an instance's console.
3220 This is somewhat special in that it returns the command line that
3221 you need to run on the master node in order to connect to the
3225 _OP_REQP = ["instance_name"]
3227 def CheckPrereq(self):
3228 """Check prerequisites.
3230 This checks that the instance is in the cluster.
3233 instance = self.cfg.GetInstanceInfo(
3234 self.cfg.ExpandInstanceName(self.op.instance_name))
3235 if instance is None:
3236 raise errors.OpPrereqError("Instance '%s' not known" %
3237 self.op.instance_name)
3238 self.instance = instance
3240 def Exec(self, feedback_fn):
3241 """Connect to the console of an instance
3244 instance = self.instance
3245 node = instance.primary_node
3247 node_insts = rpc.call_instance_list([node])[node]
3248 if node_insts is False:
3249 raise errors.OpExecError("Can't connect to node %s." % node)
3251 if instance.name not in node_insts:
3252 raise errors.OpExecError("Instance %s is not running." % instance.name)
3254 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3256 hyper = hypervisor.GetHypervisor()
3257 console_cmd = hyper.GetShellCommandForConsole(instance)
3259 argv = ["ssh", "-q", "-t"]
3260 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3261 argv.extend(ssh.BATCH_MODE_OPTS)
3263 argv.append(console_cmd)
3267 class LUAddMDDRBDComponent(LogicalUnit):
3268 """Adda new mirror member to an instance's disk.
3271 HPATH = "mirror-add"
3272 HTYPE = constants.HTYPE_INSTANCE
3273 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3275 def BuildHooksEnv(self):
3278 This runs on the master, the primary and all the secondaries.
3282 "NEW_SECONDARY": self.op.remote_node,
3283 "DISK_NAME": self.op.disk_name,
3285 env.update(_BuildInstanceHookEnvByObject(self.instance))
3286 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3287 self.op.remote_node,] + list(self.instance.secondary_nodes)
3290 def CheckPrereq(self):
3291 """Check prerequisites.
3293 This checks that the instance is in the cluster.
3296 instance = self.cfg.GetInstanceInfo(
3297 self.cfg.ExpandInstanceName(self.op.instance_name))
3298 if instance is None:
3299 raise errors.OpPrereqError("Instance '%s' not known" %
3300 self.op.instance_name)
3301 self.instance = instance
3303 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3304 if remote_node is None:
3305 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3306 self.remote_node = remote_node
3308 if remote_node == instance.primary_node:
3309 raise errors.OpPrereqError("The specified node is the primary node of"
3312 if instance.disk_template != constants.DT_REMOTE_RAID1:
3313 raise errors.OpPrereqError("Instance's disk layout is not"
3315 for disk in instance.disks:
3316 if disk.iv_name == self.op.disk_name:
3319 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3320 " instance." % self.op.disk_name)
3321 if len(disk.children) > 1:
3322 raise errors.OpPrereqError("The device already has two slave devices."
3323 " This would create a 3-disk raid1 which we"
3327 def Exec(self, feedback_fn):
3328 """Add the mirror component
3332 instance = self.instance
3334 remote_node = self.remote_node
3335 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3336 names = _GenerateUniqueNames(self.cfg, lv_names)
3337 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3338 remote_node, disk.size, names)
3340 logger.Info("adding new mirror component on secondary")
3342 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3344 _GetInstanceInfoText(instance)):
3345 raise errors.OpExecError("Failed to create new component on secondary"
3346 " node %s" % remote_node)
3348 logger.Info("adding new mirror component on primary")
3350 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3352 _GetInstanceInfoText(instance)):
3353 # remove secondary dev
3354 self.cfg.SetDiskID(new_drbd, remote_node)
3355 rpc.call_blockdev_remove(remote_node, new_drbd)
3356 raise errors.OpExecError("Failed to create volume on primary")
3358 # the device exists now
3359 # call the primary node to add the mirror to md
3360 logger.Info("adding new mirror component to md")
3361 if not rpc.call_blockdev_addchildren(instance.primary_node,
3363 logger.Error("Can't add mirror compoment to md!")
3364 self.cfg.SetDiskID(new_drbd, remote_node)
3365 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3366 logger.Error("Can't rollback on secondary")
3367 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3368 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3369 logger.Error("Can't rollback on primary")
3370 raise errors.OpExecError("Can't add mirror component to md array")
3372 disk.children.append(new_drbd)
3374 self.cfg.AddInstance(instance)
3376 _WaitForSync(self.cfg, instance, self.proc)
3381 class LURemoveMDDRBDComponent(LogicalUnit):
3382 """Remove a component from a remote_raid1 disk.
3385 HPATH = "mirror-remove"
3386 HTYPE = constants.HTYPE_INSTANCE
3387 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3389 def BuildHooksEnv(self):
3392 This runs on the master, the primary and all the secondaries.
3396 "DISK_NAME": self.op.disk_name,
3397 "DISK_ID": self.op.disk_id,
3398 "OLD_SECONDARY": self.old_secondary,
3400 env.update(_BuildInstanceHookEnvByObject(self.instance))
3401 nl = [self.sstore.GetMasterNode(),
3402 self.instance.primary_node] + list(self.instance.secondary_nodes)
3405 def CheckPrereq(self):
3406 """Check prerequisites.
3408 This checks that the instance is in the cluster.
3411 instance = self.cfg.GetInstanceInfo(
3412 self.cfg.ExpandInstanceName(self.op.instance_name))
3413 if instance is None:
3414 raise errors.OpPrereqError("Instance '%s' not known" %
3415 self.op.instance_name)
3416 self.instance = instance
3418 if instance.disk_template != constants.DT_REMOTE_RAID1:
3419 raise errors.OpPrereqError("Instance's disk layout is not"
3421 for disk in instance.disks:
3422 if disk.iv_name == self.op.disk_name:
3425 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3426 " instance." % self.op.disk_name)
3427 for child in disk.children:
3428 if (child.dev_type == constants.LD_DRBD7 and
3429 child.logical_id[2] == self.op.disk_id):
3432 raise errors.OpPrereqError("Can't find the device with this port.")
3434 if len(disk.children) < 2:
3435 raise errors.OpPrereqError("Cannot remove the last component from"
3439 if self.child.logical_id[0] == instance.primary_node:
3443 self.old_secondary = self.child.logical_id[oid]
3445 def Exec(self, feedback_fn):
3446 """Remove the mirror component
3449 instance = self.instance
3452 logger.Info("remove mirror component")
3453 self.cfg.SetDiskID(disk, instance.primary_node)
3454 if not rpc.call_blockdev_removechildren(instance.primary_node,
3456 raise errors.OpExecError("Can't remove child from mirror.")
3458 for node in child.logical_id[:2]:
3459 self.cfg.SetDiskID(child, node)
3460 if not rpc.call_blockdev_remove(node, child):
3461 logger.Error("Warning: failed to remove device from node %s,"
3462 " continuing operation." % node)
3464 disk.children.remove(child)
3465 self.cfg.AddInstance(instance)
3468 class LUReplaceDisks(LogicalUnit):
3469 """Replace the disks of an instance.
3472 HPATH = "mirrors-replace"
3473 HTYPE = constants.HTYPE_INSTANCE
3474 _OP_REQP = ["instance_name", "mode", "disks"]
3476 def BuildHooksEnv(self):
3479 This runs on the master, the primary and all the secondaries.
3483 "MODE": self.op.mode,
3484 "NEW_SECONDARY": self.op.remote_node,
3485 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3487 env.update(_BuildInstanceHookEnvByObject(self.instance))
3489 self.sstore.GetMasterNode(),
3490 self.instance.primary_node,
3492 if self.op.remote_node is not None:
3493 nl.append(self.op.remote_node)
3496 def CheckPrereq(self):
3497 """Check prerequisites.
3499 This checks that the instance is in the cluster.
3502 instance = self.cfg.GetInstanceInfo(
3503 self.cfg.ExpandInstanceName(self.op.instance_name))
3504 if instance is None:
3505 raise errors.OpPrereqError("Instance '%s' not known" %
3506 self.op.instance_name)
3507 self.instance = instance
3508 self.op.instance_name = instance.name
3510 if instance.disk_template not in constants.DTS_NET_MIRROR:
3511 raise errors.OpPrereqError("Instance's disk layout is not"
3512 " network mirrored.")
3514 if len(instance.secondary_nodes) != 1:
3515 raise errors.OpPrereqError("The instance has a strange layout,"
3516 " expected one secondary but found %d" %
3517 len(instance.secondary_nodes))
3519 self.sec_node = instance.secondary_nodes[0]
3521 remote_node = getattr(self.op, "remote_node", None)
3522 if remote_node is not None:
3523 remote_node = self.cfg.ExpandNodeName(remote_node)
3524 if remote_node is None:
3525 raise errors.OpPrereqError("Node '%s' not known" %
3526 self.op.remote_node)
3527 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3529 self.remote_node_info = None
3530 if remote_node == instance.primary_node:
3531 raise errors.OpPrereqError("The specified node is the primary node of"
3533 elif remote_node == self.sec_node:
3534 if self.op.mode == constants.REPLACE_DISK_SEC:
3535 # this is for DRBD8, where we can't execute the same mode of
3536 # replacement as for drbd7 (no different port allocated)
3537 raise errors.OpPrereqError("Same secondary given, cannot execute"
3539 # the user gave the current secondary, switch to
3540 # 'no-replace-secondary' mode for drbd7
3542 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3543 self.op.mode != constants.REPLACE_DISK_ALL):
3544 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3545 " disks replacement, not individual ones")
3546 if instance.disk_template == constants.DT_DRBD8:
3547 if (self.op.mode == constants.REPLACE_DISK_ALL and
3548 remote_node is not None):
3549 # switch to replace secondary mode
3550 self.op.mode = constants.REPLACE_DISK_SEC
3552 if self.op.mode == constants.REPLACE_DISK_ALL:
3553 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3554 " secondary disk replacement, not"
3556 elif self.op.mode == constants.REPLACE_DISK_PRI:
3557 if remote_node is not None:
3558 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3559 " the secondary while doing a primary"
3560 " node disk replacement")
3561 self.tgt_node = instance.primary_node
3562 self.oth_node = instance.secondary_nodes[0]
3563 elif self.op.mode == constants.REPLACE_DISK_SEC:
3564 self.new_node = remote_node # this can be None, in which case
3565 # we don't change the secondary
3566 self.tgt_node = instance.secondary_nodes[0]
3567 self.oth_node = instance.primary_node
3569 raise errors.ProgrammerError("Unhandled disk replace mode")
3571 for name in self.op.disks:
3572 if instance.FindDisk(name) is None:
3573 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3574 (name, instance.name))
3575 self.op.remote_node = remote_node
3577 def _ExecRR1(self, feedback_fn):
3578 """Replace the disks of an instance.
3581 instance = self.instance
3584 if self.op.remote_node is None:
3585 remote_node = self.sec_node
3587 remote_node = self.op.remote_node
3589 for dev in instance.disks:
3591 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3592 names = _GenerateUniqueNames(cfg, lv_names)
3593 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3594 remote_node, size, names)
3595 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3596 logger.Info("adding new mirror component on secondary for %s" %
3599 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3601 _GetInstanceInfoText(instance)):
3602 raise errors.OpExecError("Failed to create new component on secondary"
3603 " node %s. Full abort, cleanup manually!" %
3606 logger.Info("adding new mirror component on primary")
3608 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3610 _GetInstanceInfoText(instance)):
3611 # remove secondary dev
3612 cfg.SetDiskID(new_drbd, remote_node)
3613 rpc.call_blockdev_remove(remote_node, new_drbd)
3614 raise errors.OpExecError("Failed to create volume on primary!"
3615 " Full abort, cleanup manually!!")
3617 # the device exists now
3618 # call the primary node to add the mirror to md
3619 logger.Info("adding new mirror component to md")
3620 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3622 logger.Error("Can't add mirror compoment to md!")
3623 cfg.SetDiskID(new_drbd, remote_node)
3624 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3625 logger.Error("Can't rollback on secondary")
3626 cfg.SetDiskID(new_drbd, instance.primary_node)
3627 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3628 logger.Error("Can't rollback on primary")
3629 raise errors.OpExecError("Full abort, cleanup manually!!")
3631 dev.children.append(new_drbd)
3632 cfg.AddInstance(instance)
3634 # this can fail as the old devices are degraded and _WaitForSync
3635 # does a combined result over all disks, so we don't check its
3637 _WaitForSync(cfg, instance, self.proc, unlock=True)
3639 # so check manually all the devices
3640 for name in iv_names:
3641 dev, child, new_drbd = iv_names[name]
3642 cfg.SetDiskID(dev, instance.primary_node)
3643 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3645 raise errors.OpExecError("MD device %s is degraded!" % name)
3646 cfg.SetDiskID(new_drbd, instance.primary_node)
3647 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3649 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3651 for name in iv_names:
3652 dev, child, new_drbd = iv_names[name]
3653 logger.Info("remove mirror %s component" % name)
3654 cfg.SetDiskID(dev, instance.primary_node)
3655 if not rpc.call_blockdev_removechildren(instance.primary_node,
3657 logger.Error("Can't remove child from mirror, aborting"
3658 " *this device cleanup*.\nYou need to cleanup manually!!")
3661 for node in child.logical_id[:2]:
3662 logger.Info("remove child device on %s" % node)
3663 cfg.SetDiskID(child, node)
3664 if not rpc.call_blockdev_remove(node, child):
3665 logger.Error("Warning: failed to remove device from node %s,"
3666 " continuing operation." % node)
3668 dev.children.remove(child)
3670 cfg.AddInstance(instance)
3672 def _ExecD8DiskOnly(self, feedback_fn):
3673 """Replace a disk on the primary or secondary for dbrd8.
3675 The algorithm for replace is quite complicated:
3676 - for each disk to be replaced:
3677 - create new LVs on the target node with unique names
3678 - detach old LVs from the drbd device
3679 - rename old LVs to name_replaced.<time_t>
3680 - rename new LVs to old LVs
3681 - attach the new LVs (with the old names now) to the drbd device
3682 - wait for sync across all devices
3683 - for each modified disk:
3684 - remove old LVs (which have the name name_replaces.<time_t>)
3686 Failures are not very well handled.
3690 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3691 instance = self.instance
3693 vgname = self.cfg.GetVGName()
3696 tgt_node = self.tgt_node
3697 oth_node = self.oth_node
3699 # Step: check device activation
3700 self.proc.LogStep(1, steps_total, "check device existence")
3701 info("checking volume groups")
3702 my_vg = cfg.GetVGName()
3703 results = rpc.call_vg_list([oth_node, tgt_node])
3705 raise errors.OpExecError("Can't list volume groups on the nodes")
3706 for node in oth_node, tgt_node:
3707 res = results.get(node, False)
3708 if not res or my_vg not in res:
3709 raise errors.OpExecError("Volume group '%s' not found on %s" %
3711 for dev in instance.disks:
3712 if not dev.iv_name in self.op.disks:
3714 for node in tgt_node, oth_node:
3715 info("checking %s on %s" % (dev.iv_name, node))
3716 cfg.SetDiskID(dev, node)
3717 if not rpc.call_blockdev_find(node, dev):
3718 raise errors.OpExecError("Can't find device %s on node %s" %
3719 (dev.iv_name, node))
3721 # Step: check other node consistency
3722 self.proc.LogStep(2, steps_total, "check peer consistency")
3723 for dev in instance.disks:
3724 if not dev.iv_name in self.op.disks:
3726 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3727 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3728 oth_node==instance.primary_node):
3729 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3730 " to replace disks on this node (%s)" %
3731 (oth_node, tgt_node))
3733 # Step: create new storage
3734 self.proc.LogStep(3, steps_total, "allocate new storage")
3735 for dev in instance.disks:
3736 if not dev.iv_name in self.op.disks:
3739 cfg.SetDiskID(dev, tgt_node)
3740 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3741 names = _GenerateUniqueNames(cfg, lv_names)
3742 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3743 logical_id=(vgname, names[0]))
3744 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3745 logical_id=(vgname, names[1]))
3746 new_lvs = [lv_data, lv_meta]
3747 old_lvs = dev.children
3748 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3749 info("creating new local storage on %s for %s" %
3750 (tgt_node, dev.iv_name))
3751 # since we *always* want to create this LV, we use the
3752 # _Create...OnPrimary (which forces the creation), even if we
3753 # are talking about the secondary node
3754 for new_lv in new_lvs:
3755 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3756 _GetInstanceInfoText(instance)):
3757 raise errors.OpExecError("Failed to create new LV named '%s' on"
3759 (new_lv.logical_id[1], tgt_node))
3761 # Step: for each lv, detach+rename*2+attach
3762 self.proc.LogStep(4, steps_total, "change drbd configuration")
3763 for dev, old_lvs, new_lvs in iv_names.itervalues():
3764 info("detaching %s drbd from local storage" % dev.iv_name)
3765 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3766 raise errors.OpExecError("Can't detach drbd from local storage on node"
3767 " %s for device %s" % (tgt_node, dev.iv_name))
3769 #cfg.Update(instance)
3771 # ok, we created the new LVs, so now we know we have the needed
3772 # storage; as such, we proceed on the target node to rename
3773 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3774 # using the assumption that logical_id == physical_id (which in
3775 # turn is the unique_id on that node)
3777 # FIXME(iustin): use a better name for the replaced LVs
3778 temp_suffix = int(time.time())
3779 ren_fn = lambda d, suff: (d.physical_id[0],
3780 d.physical_id[1] + "_replaced-%s" % suff)
3781 # build the rename list based on what LVs exist on the node
3783 for to_ren in old_lvs:
3784 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3785 if find_res is not None: # device exists
3786 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3788 info("renaming the old LVs on the target node")
3789 if not rpc.call_blockdev_rename(tgt_node, rlist):
3790 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3791 # now we rename the new LVs to the old LVs
3792 info("renaming the new LVs on the target node")
3793 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3794 if not rpc.call_blockdev_rename(tgt_node, rlist):
3795 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3797 for old, new in zip(old_lvs, new_lvs):
3798 new.logical_id = old.logical_id
3799 cfg.SetDiskID(new, tgt_node)
3801 for disk in old_lvs:
3802 disk.logical_id = ren_fn(disk, temp_suffix)
3803 cfg.SetDiskID(disk, tgt_node)
3805 # now that the new lvs have the old name, we can add them to the device
3806 info("adding new mirror component on %s" % tgt_node)
3807 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3808 for new_lv in new_lvs:
3809 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3810 warning("Can't rollback device %s", hint="manually cleanup unused"
3812 raise errors.OpExecError("Can't add local storage to drbd")
3814 dev.children = new_lvs
3815 cfg.Update(instance)
3817 # Step: wait for sync
3819 # this can fail as the old devices are degraded and _WaitForSync
3820 # does a combined result over all disks, so we don't check its
3822 self.proc.LogStep(5, steps_total, "sync devices")
3823 _WaitForSync(cfg, instance, self.proc, unlock=True)
3825 # so check manually all the devices
3826 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3827 cfg.SetDiskID(dev, instance.primary_node)
3828 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3830 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3832 # Step: remove old storage
3833 self.proc.LogStep(6, steps_total, "removing old storage")
3834 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3835 info("remove logical volumes for %s" % name)
3837 cfg.SetDiskID(lv, tgt_node)
3838 if not rpc.call_blockdev_remove(tgt_node, lv):
3839 warning("Can't remove old LV", hint="manually remove unused LVs")
3842 def _ExecD8Secondary(self, feedback_fn):
3843 """Replace the secondary node for drbd8.
3845 The algorithm for replace is quite complicated:
3846 - for all disks of the instance:
3847 - create new LVs on the new node with same names
3848 - shutdown the drbd device on the old secondary
3849 - disconnect the drbd network on the primary
3850 - create the drbd device on the new secondary
3851 - network attach the drbd on the primary, using an artifice:
3852 the drbd code for Attach() will connect to the network if it
3853 finds a device which is connected to the good local disks but
3855 - wait for sync across all devices
3856 - remove all disks from the old secondary
3858 Failures are not very well handled.
3862 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3863 instance = self.instance
3865 vgname = self.cfg.GetVGName()
3868 old_node = self.tgt_node
3869 new_node = self.new_node
3870 pri_node = instance.primary_node
3872 # Step: check device activation
3873 self.proc.LogStep(1, steps_total, "check device existence")
3874 info("checking volume groups")
3875 my_vg = cfg.GetVGName()
3876 results = rpc.call_vg_list([pri_node, new_node])
3878 raise errors.OpExecError("Can't list volume groups on the nodes")
3879 for node in pri_node, new_node:
3880 res = results.get(node, False)
3881 if not res or my_vg not in res:
3882 raise errors.OpExecError("Volume group '%s' not found on %s" %
3884 for dev in instance.disks:
3885 if not dev.iv_name in self.op.disks:
3887 info("checking %s on %s" % (dev.iv_name, pri_node))
3888 cfg.SetDiskID(dev, pri_node)
3889 if not rpc.call_blockdev_find(pri_node, dev):
3890 raise errors.OpExecError("Can't find device %s on node %s" %
3891 (dev.iv_name, pri_node))
3893 # Step: check other node consistency
3894 self.proc.LogStep(2, steps_total, "check peer consistency")
3895 for dev in instance.disks:
3896 if not dev.iv_name in self.op.disks:
3898 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3899 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3900 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3901 " unsafe to replace the secondary" %
3904 # Step: create new storage
3905 self.proc.LogStep(3, steps_total, "allocate new storage")
3906 for dev in instance.disks:
3908 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3909 # since we *always* want to create this LV, we use the
3910 # _Create...OnPrimary (which forces the creation), even if we
3911 # are talking about the secondary node
3912 for new_lv in dev.children:
3913 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3914 _GetInstanceInfoText(instance)):
3915 raise errors.OpExecError("Failed to create new LV named '%s' on"
3917 (new_lv.logical_id[1], new_node))
3919 iv_names[dev.iv_name] = (dev, dev.children)
3921 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3922 for dev in instance.disks:
3924 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3925 # create new devices on new_node
3926 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3927 logical_id=(pri_node, new_node,
3929 children=dev.children)
3930 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3932 _GetInstanceInfoText(instance)):
3933 raise errors.OpExecError("Failed to create new DRBD on"
3934 " node '%s'" % new_node)
3936 for dev in instance.disks:
3937 # we have new devices, shutdown the drbd on the old secondary
3938 info("shutting down drbd for %s on old node" % dev.iv_name)
3939 cfg.SetDiskID(dev, old_node)
3940 if not rpc.call_blockdev_shutdown(old_node, dev):
3941 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3942 hint="Please cleanup this device manually as soon as possible")
3944 info("detaching primary drbds from the network (=> standalone)")
3946 for dev in instance.disks:
3947 cfg.SetDiskID(dev, pri_node)
3948 # set the physical (unique in bdev terms) id to None, meaning
3949 # detach from network
3950 dev.physical_id = (None,) * len(dev.physical_id)
3951 # and 'find' the device, which will 'fix' it to match the
3953 if rpc.call_blockdev_find(pri_node, dev):
3956 warning("Failed to detach drbd %s from network, unusual case" %
3960 # no detaches succeeded (very unlikely)
3961 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3963 # if we managed to detach at least one, we update all the disks of
3964 # the instance to point to the new secondary
3965 info("updating instance configuration")
3966 for dev in instance.disks:
3967 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3968 cfg.SetDiskID(dev, pri_node)
3969 cfg.Update(instance)
3971 # and now perform the drbd attach
3972 info("attaching primary drbds to new secondary (standalone => connected)")
3974 for dev in instance.disks:
3975 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3976 # since the attach is smart, it's enough to 'find' the device,
3977 # it will automatically activate the network, if the physical_id
3979 cfg.SetDiskID(dev, pri_node)
3980 if not rpc.call_blockdev_find(pri_node, dev):
3981 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3982 "please do a gnt-instance info to see the status of disks")
3984 # this can fail as the old devices are degraded and _WaitForSync
3985 # does a combined result over all disks, so we don't check its
3987 self.proc.LogStep(5, steps_total, "sync devices")
3988 _WaitForSync(cfg, instance, self.proc, unlock=True)
3990 # so check manually all the devices
3991 for name, (dev, old_lvs) in iv_names.iteritems():
3992 cfg.SetDiskID(dev, pri_node)
3993 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3995 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3997 self.proc.LogStep(6, steps_total, "removing old storage")
3998 for name, (dev, old_lvs) in iv_names.iteritems():
3999 info("remove logical volumes for %s" % name)
4001 cfg.SetDiskID(lv, old_node)
4002 if not rpc.call_blockdev_remove(old_node, lv):
4003 warning("Can't remove LV on old secondary",
4004 hint="Cleanup stale volumes by hand")
4006 def Exec(self, feedback_fn):
4007 """Execute disk replacement.
4009 This dispatches the disk replacement to the appropriate handler.
4012 instance = self.instance
4013 if instance.disk_template == constants.DT_REMOTE_RAID1:
4015 elif instance.disk_template == constants.DT_DRBD8:
4016 if self.op.remote_node is None:
4017 fn = self._ExecD8DiskOnly
4019 fn = self._ExecD8Secondary
4021 raise errors.ProgrammerError("Unhandled disk replacement case")
4022 return fn(feedback_fn)
4025 class LUQueryInstanceData(NoHooksLU):
4026 """Query runtime instance data.
4029 _OP_REQP = ["instances"]
4031 def CheckPrereq(self):
4032 """Check prerequisites.
4034 This only checks the optional instance list against the existing names.
4037 if not isinstance(self.op.instances, list):
4038 raise errors.OpPrereqError("Invalid argument type 'instances'")
4039 if self.op.instances:
4040 self.wanted_instances = []
4041 names = self.op.instances
4043 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4044 if instance is None:
4045 raise errors.OpPrereqError("No such instance name '%s'" % name)
4046 self.wanted_instances.append(instance)
4048 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4049 in self.cfg.GetInstanceList()]
4053 def _ComputeDiskStatus(self, instance, snode, dev):
4054 """Compute block device status.
4057 self.cfg.SetDiskID(dev, instance.primary_node)
4058 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4059 if dev.dev_type in constants.LDS_DRBD:
4060 # we change the snode then (otherwise we use the one passed in)
4061 if dev.logical_id[0] == instance.primary_node:
4062 snode = dev.logical_id[1]
4064 snode = dev.logical_id[0]
4067 self.cfg.SetDiskID(dev, snode)
4068 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4073 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4074 for child in dev.children]
4079 "iv_name": dev.iv_name,
4080 "dev_type": dev.dev_type,
4081 "logical_id": dev.logical_id,
4082 "physical_id": dev.physical_id,
4083 "pstatus": dev_pstatus,
4084 "sstatus": dev_sstatus,
4085 "children": dev_children,
4090 def Exec(self, feedback_fn):
4091 """Gather and return data"""
4093 for instance in self.wanted_instances:
4094 remote_info = rpc.call_instance_info(instance.primary_node,
4096 if remote_info and "state" in remote_info:
4099 remote_state = "down"
4100 if instance.status == "down":
4101 config_state = "down"
4105 disks = [self._ComputeDiskStatus(instance, None, device)
4106 for device in instance.disks]
4109 "name": instance.name,
4110 "config_state": config_state,
4111 "run_state": remote_state,
4112 "pnode": instance.primary_node,
4113 "snodes": instance.secondary_nodes,
4115 "memory": instance.memory,
4116 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4118 "network_port": instance.network_port,
4119 "vcpus": instance.vcpus,
4120 "kernel_path": instance.kernel_path,
4121 "initrd_path": instance.initrd_path,
4122 "hvm_boot_order": instance.hvm_boot_order,
4125 result[instance.name] = idict
4130 class LUSetInstanceParms(LogicalUnit):
4131 """Modifies an instances's parameters.
4134 HPATH = "instance-modify"
4135 HTYPE = constants.HTYPE_INSTANCE
4136 _OP_REQP = ["instance_name"]
4138 def BuildHooksEnv(self):
4141 This runs on the master, primary and secondaries.
4146 args['memory'] = self.mem
4148 args['vcpus'] = self.vcpus
4149 if self.do_ip or self.do_bridge or self.mac:
4153 ip = self.instance.nics[0].ip
4155 bridge = self.bridge
4157 bridge = self.instance.nics[0].bridge
4161 mac = self.instance.nics[0].mac
4162 args['nics'] = [(ip, bridge, mac)]
4163 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4164 nl = [self.sstore.GetMasterNode(),
4165 self.instance.primary_node] + list(self.instance.secondary_nodes)
4168 def CheckPrereq(self):
4169 """Check prerequisites.
4171 This only checks the instance list against the existing names.
4174 self.mem = getattr(self.op, "mem", None)
4175 self.vcpus = getattr(self.op, "vcpus", None)
4176 self.ip = getattr(self.op, "ip", None)
4177 self.mac = getattr(self.op, "mac", None)
4178 self.bridge = getattr(self.op, "bridge", None)
4179 self.kernel_path = getattr(self.op, "kernel_path", None)
4180 self.initrd_path = getattr(self.op, "initrd_path", None)
4181 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4182 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4183 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4184 if all_parms.count(None) == len(all_parms):
4185 raise errors.OpPrereqError("No changes submitted")
4186 if self.mem is not None:
4188 self.mem = int(self.mem)
4189 except ValueError, err:
4190 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4191 if self.vcpus is not None:
4193 self.vcpus = int(self.vcpus)
4194 except ValueError, err:
4195 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4196 if self.ip is not None:
4198 if self.ip.lower() == "none":
4201 if not utils.IsValidIP(self.ip):
4202 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4205 self.do_bridge = (self.bridge is not None)
4206 if self.mac is not None:
4207 if self.cfg.IsMacInUse(self.mac):
4208 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4210 if not utils.IsValidMac(self.mac):
4211 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4213 if self.kernel_path is not None:
4214 self.do_kernel_path = True
4215 if self.kernel_path == constants.VALUE_NONE:
4216 raise errors.OpPrereqError("Can't set instance to no kernel")
4218 if self.kernel_path != constants.VALUE_DEFAULT:
4219 if not os.path.isabs(self.kernel_path):
4220 raise errors.OpPrereqError("The kernel path must be an absolute"
4223 self.do_kernel_path = False
4225 if self.initrd_path is not None:
4226 self.do_initrd_path = True
4227 if self.initrd_path not in (constants.VALUE_NONE,
4228 constants.VALUE_DEFAULT):
4229 if not os.path.isabs(self.initrd_path):
4230 raise errors.OpPrereqError("The initrd path must be an absolute"
4233 self.do_initrd_path = False
4235 # boot order verification
4236 if self.hvm_boot_order is not None:
4237 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4238 if len(self.hvm_boot_order.strip("acdn")) != 0:
4239 raise errors.OpPrereqError("invalid boot order specified,"
4240 " must be one or more of [acdn]"
4243 instance = self.cfg.GetInstanceInfo(
4244 self.cfg.ExpandInstanceName(self.op.instance_name))
4245 if instance is None:
4246 raise errors.OpPrereqError("No such instance name '%s'" %
4247 self.op.instance_name)
4248 self.op.instance_name = instance.name
4249 self.instance = instance
4252 def Exec(self, feedback_fn):
4253 """Modifies an instance.
4255 All parameters take effect only at the next restart of the instance.
4258 instance = self.instance
4260 instance.memory = self.mem
4261 result.append(("mem", self.mem))
4263 instance.vcpus = self.vcpus
4264 result.append(("vcpus", self.vcpus))
4266 instance.nics[0].ip = self.ip
4267 result.append(("ip", self.ip))
4269 instance.nics[0].bridge = self.bridge
4270 result.append(("bridge", self.bridge))
4272 instance.nics[0].mac = self.mac
4273 result.append(("mac", self.mac))
4274 if self.do_kernel_path:
4275 instance.kernel_path = self.kernel_path
4276 result.append(("kernel_path", self.kernel_path))
4277 if self.do_initrd_path:
4278 instance.initrd_path = self.initrd_path
4279 result.append(("initrd_path", self.initrd_path))
4280 if self.hvm_boot_order:
4281 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4282 instance.hvm_boot_order = None
4284 instance.hvm_boot_order = self.hvm_boot_order
4285 result.append(("hvm_boot_order", self.hvm_boot_order))
4287 self.cfg.AddInstance(instance)
4292 class LUQueryExports(NoHooksLU):
4293 """Query the exports list
4298 def CheckPrereq(self):
4299 """Check that the nodelist contains only existing nodes.
4302 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4304 def Exec(self, feedback_fn):
4305 """Compute the list of all the exported system images.
4308 a dictionary with the structure node->(export-list)
4309 where export-list is a list of the instances exported on
4313 return rpc.call_export_list(self.nodes)
4316 class LUExportInstance(LogicalUnit):
4317 """Export an instance to an image in the cluster.
4320 HPATH = "instance-export"
4321 HTYPE = constants.HTYPE_INSTANCE
4322 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4324 def BuildHooksEnv(self):
4327 This will run on the master, primary node and target node.
4331 "EXPORT_NODE": self.op.target_node,
4332 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4334 env.update(_BuildInstanceHookEnvByObject(self.instance))
4335 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4336 self.op.target_node]
4339 def CheckPrereq(self):
4340 """Check prerequisites.
4342 This checks that the instance name is a valid one.
4345 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4346 self.instance = self.cfg.GetInstanceInfo(instance_name)
4347 if self.instance is None:
4348 raise errors.OpPrereqError("Instance '%s' not found" %
4349 self.op.instance_name)
4352 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4353 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4355 if self.dst_node is None:
4356 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4357 self.op.target_node)
4358 self.op.target_node = self.dst_node.name
4360 def Exec(self, feedback_fn):
4361 """Export an instance to an image in the cluster.
4364 instance = self.instance
4365 dst_node = self.dst_node
4366 src_node = instance.primary_node
4367 if self.op.shutdown:
4368 # shutdown the instance, but not the disks
4369 if not rpc.call_instance_shutdown(src_node, instance):
4370 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4371 (instance.name, source_node))
4373 vgname = self.cfg.GetVGName()
4378 for disk in instance.disks:
4379 if disk.iv_name == "sda":
4380 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4381 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4383 if not new_dev_name:
4384 logger.Error("could not snapshot block device %s on node %s" %
4385 (disk.logical_id[1], src_node))
4387 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4388 logical_id=(vgname, new_dev_name),
4389 physical_id=(vgname, new_dev_name),
4390 iv_name=disk.iv_name)
4391 snap_disks.append(new_dev)
4394 if self.op.shutdown and instance.status == "up":
4395 if not rpc.call_instance_start(src_node, instance, None):
4396 _ShutdownInstanceDisks(instance, self.cfg)
4397 raise errors.OpExecError("Could not start instance")
4399 # TODO: check for size
4401 for dev in snap_disks:
4402 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4404 logger.Error("could not export block device %s from node"
4406 (dev.logical_id[1], src_node, dst_node.name))
4407 if not rpc.call_blockdev_remove(src_node, dev):
4408 logger.Error("could not remove snapshot block device %s from"
4409 " node %s" % (dev.logical_id[1], src_node))
4411 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4412 logger.Error("could not finalize export for instance %s on node %s" %
4413 (instance.name, dst_node.name))
4415 nodelist = self.cfg.GetNodeList()
4416 nodelist.remove(dst_node.name)
4418 # on one-node clusters nodelist will be empty after the removal
4419 # if we proceed the backup would be removed because OpQueryExports
4420 # substitutes an empty list with the full cluster node list.
4422 op = opcodes.OpQueryExports(nodes=nodelist)
4423 exportlist = self.proc.ChainOpCode(op)
4424 for node in exportlist:
4425 if instance.name in exportlist[node]:
4426 if not rpc.call_export_remove(node, instance.name):
4427 logger.Error("could not remove older export for instance %s"
4428 " on node %s" % (instance.name, node))
4431 class TagsLU(NoHooksLU):
4434 This is an abstract class which is the parent of all the other tags LUs.
4437 def CheckPrereq(self):
4438 """Check prerequisites.
4441 if self.op.kind == constants.TAG_CLUSTER:
4442 self.target = self.cfg.GetClusterInfo()
4443 elif self.op.kind == constants.TAG_NODE:
4444 name = self.cfg.ExpandNodeName(self.op.name)
4446 raise errors.OpPrereqError("Invalid node name (%s)" %
4449 self.target = self.cfg.GetNodeInfo(name)
4450 elif self.op.kind == constants.TAG_INSTANCE:
4451 name = self.cfg.ExpandInstanceName(self.op.name)
4453 raise errors.OpPrereqError("Invalid instance name (%s)" %
4456 self.target = self.cfg.GetInstanceInfo(name)
4458 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4462 class LUGetTags(TagsLU):
4463 """Returns the tags of a given object.
4466 _OP_REQP = ["kind", "name"]
4468 def Exec(self, feedback_fn):
4469 """Returns the tag list.
4472 return self.target.GetTags()
4475 class LUSearchTags(NoHooksLU):
4476 """Searches the tags for a given pattern.
4479 _OP_REQP = ["pattern"]
4481 def CheckPrereq(self):
4482 """Check prerequisites.
4484 This checks the pattern passed for validity by compiling it.
4488 self.re = re.compile(self.op.pattern)
4489 except re.error, err:
4490 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4491 (self.op.pattern, err))
4493 def Exec(self, feedback_fn):
4494 """Returns the tag list.
4498 tgts = [("/cluster", cfg.GetClusterInfo())]
4499 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4500 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4501 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4502 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4504 for path, target in tgts:
4505 for tag in target.GetTags():
4506 if self.re.search(tag):
4507 results.append((path, tag))
4511 class LUAddTags(TagsLU):
4512 """Sets a tag on a given object.
4515 _OP_REQP = ["kind", "name", "tags"]
4517 def CheckPrereq(self):
4518 """Check prerequisites.
4520 This checks the type and length of the tag name and value.
4523 TagsLU.CheckPrereq(self)
4524 for tag in self.op.tags:
4525 objects.TaggableObject.ValidateTag(tag)
4527 def Exec(self, feedback_fn):
4532 for tag in self.op.tags:
4533 self.target.AddTag(tag)
4534 except errors.TagError, err:
4535 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4537 self.cfg.Update(self.target)
4538 except errors.ConfigurationError:
4539 raise errors.OpRetryError("There has been a modification to the"
4540 " config file and the operation has been"
4541 " aborted. Please retry.")
4544 class LUDelTags(TagsLU):
4545 """Delete a list of tags from a given object.
4548 _OP_REQP = ["kind", "name", "tags"]
4550 def CheckPrereq(self):
4551 """Check prerequisites.
4553 This checks that we have the given tag.
4556 TagsLU.CheckPrereq(self)
4557 for tag in self.op.tags:
4558 objects.TaggableObject.ValidateTag(tag)
4559 del_tags = frozenset(self.op.tags)
4560 cur_tags = self.target.GetTags()
4561 if not del_tags <= cur_tags:
4562 diff_tags = del_tags - cur_tags
4563 diff_names = ["'%s'" % tag for tag in diff_tags]
4565 raise errors.OpPrereqError("Tag(s) %s not found" %
4566 (",".join(diff_names)))
4568 def Exec(self, feedback_fn):
4569 """Remove the tag from the object.
4572 for tag in self.op.tags:
4573 self.target.RemoveTag(tag)
4575 self.cfg.Update(self.target)
4576 except errors.ConfigurationError:
4577 raise errors.OpRetryError("There has been a modification to the"
4578 " config file and the operation has been"
4579 " aborted. Please retry.")
4581 class LUTestDelay(NoHooksLU):
4582 """Sleep for a specified amount of time.
4584 This LU sleeps on the master and/or nodes for a specified amoutn of
4588 _OP_REQP = ["duration", "on_master", "on_nodes"]
4590 def CheckPrereq(self):
4591 """Check prerequisites.
4593 This checks that we have a good list of nodes and/or the duration
4598 if self.op.on_nodes:
4599 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4601 def Exec(self, feedback_fn):
4602 """Do the actual sleep.
4605 if self.op.on_master:
4606 if not utils.TestDelay(self.op.duration):
4607 raise errors.OpExecError("Error during master delay test")
4608 if self.op.on_nodes:
4609 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4611 raise errors.OpExecError("Complete failure from rpc call")
4612 for node, node_result in result.items():
4614 raise errors.OpExecError("Failure during rpc call to node %s,"
4615 " result: %s" % (node, node_result))