4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 As for the node lists, the master should not be included in the
137 them, as it will be added by the hooks runner in case this LU
138 requires a cluster to run on (otherwise we don't have a node
139 list). No nodes should be returned as an empty list (and not
142 Note that if the HPATH for a LU class is None, this function will
146 raise NotImplementedError
149 class NoHooksLU(LogicalUnit):
150 """Simple LU which runs no hooks.
152 This LU is intended as a parent for other LogicalUnits which will
153 run no hooks, in order to reduce duplicate code.
159 def BuildHooksEnv(self):
162 This is a no-op, since we don't run hooks.
168 def _AddHostToEtcHosts(hostname):
169 """Wrapper around utils.SetEtcHostsEntry.
172 hi = utils.HostInfo(name=hostname)
173 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
176 def _RemoveHostFromEtcHosts(hostname):
177 """Wrapper around utils.RemoveEtcHostsEntry.
180 hi = utils.HostInfo(name=hostname)
181 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
182 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
185 def _GetWantedNodes(lu, nodes):
186 """Returns list of checked and expanded node names.
189 nodes: List of nodes (strings) or None for all
192 if not isinstance(nodes, list):
193 raise errors.OpPrereqError("Invalid argument type 'nodes'")
199 node = lu.cfg.ExpandNodeName(name)
201 raise errors.OpPrereqError("No such node name '%s'" % name)
205 wanted = lu.cfg.GetNodeList()
206 return utils.NiceSort(wanted)
209 def _GetWantedInstances(lu, instances):
210 """Returns list of checked and expanded instance names.
213 instances: List of instances (strings) or None for all
216 if not isinstance(instances, list):
217 raise errors.OpPrereqError("Invalid argument type 'instances'")
222 for name in instances:
223 instance = lu.cfg.ExpandInstanceName(name)
225 raise errors.OpPrereqError("No such instance name '%s'" % name)
226 wanted.append(instance)
229 wanted = lu.cfg.GetInstanceList()
230 return utils.NiceSort(wanted)
233 def _CheckOutputFields(static, dynamic, selected):
234 """Checks whether all selected fields are valid.
237 static: Static fields
238 dynamic: Dynamic fields
241 static_fields = frozenset(static)
242 dynamic_fields = frozenset(dynamic)
244 all_fields = static_fields | dynamic_fields
246 if not all_fields.issuperset(selected):
247 raise errors.OpPrereqError("Unknown output fields selected: %s"
248 % ",".join(frozenset(selected).
249 difference(all_fields)))
252 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253 memory, vcpus, nics):
254 """Builds instance related env variables for hooks from single variables.
257 secondary_nodes: List of secondary nodes as strings
261 "INSTANCE_NAME": name,
262 "INSTANCE_PRIMARY": primary_node,
263 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264 "INSTANCE_OS_TYPE": os_type,
265 "INSTANCE_STATUS": status,
266 "INSTANCE_MEMORY": memory,
267 "INSTANCE_VCPUS": vcpus,
271 nic_count = len(nics)
272 for idx, (ip, bridge, mac) in enumerate(nics):
275 env["INSTANCE_NIC%d_IP" % idx] = ip
276 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
281 env["INSTANCE_NIC_COUNT"] = nic_count
286 def _BuildInstanceHookEnvByObject(instance, override=None):
287 """Builds instance related env variables for hooks from an object.
290 instance: objects.Instance object of instance
291 override: dict of values to override
294 'name': instance.name,
295 'primary_node': instance.primary_node,
296 'secondary_nodes': instance.secondary_nodes,
297 'os_type': instance.os,
298 'status': instance.os,
299 'memory': instance.memory,
300 'vcpus': instance.vcpus,
301 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
304 args.update(override)
305 return _BuildInstanceHookEnv(**args)
308 def _UpdateKnownHosts(fullnode, ip, pubkey):
309 """Ensure a node has a correct known_hosts entry.
312 fullnode - Fully qualified domain name of host. (str)
313 ip - IPv4 address of host (str)
314 pubkey - the public key of the cluster
317 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
318 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
320 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
329 logger.Debug('read %s' % (repr(rawline),))
331 parts = rawline.rstrip('\r\n').split()
333 # Ignore unwanted lines
334 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
335 fields = parts[0].split(',')
340 for spec in [ ip, fullnode ]:
341 if spec not in fields:
346 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
347 if haveall and key == pubkey:
349 save_lines.append(rawline)
350 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
353 if havesome and (not haveall or key != pubkey):
355 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
358 save_lines.append(rawline)
361 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
362 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
365 save_lines = save_lines + add_lines
367 # Write a new file and replace old.
368 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
370 newfile = os.fdopen(fd, 'w')
372 newfile.write(''.join(save_lines))
375 logger.Debug("Wrote new known_hosts.")
376 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
379 # Simply appending a new line will do the trick.
381 for add in add_lines:
387 def _HasValidVG(vglist, vgname):
388 """Checks if the volume group list is valid.
390 A non-None return value means there's an error, and the return value
391 is the error message.
394 vgsize = vglist.get(vgname, None)
396 return "volume group '%s' missing" % vgname
398 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
403 def _InitSSHSetup(node):
404 """Setup the SSH configuration for the cluster.
407 This generates a dsa keypair for root, adds the pub key to the
408 permitted hosts and adds the hostkey to its own known hosts.
411 node: the name of this host as a fqdn
414 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
416 for name in priv_key, pub_key:
417 if os.path.exists(name):
418 utils.CreateBackup(name)
419 utils.RemoveFile(name)
421 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
425 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
428 f = open(pub_key, 'r')
430 utils.AddAuthorizedKey(auth_keys, f.read(8192))
435 def _InitGanetiServerSetup(ss):
436 """Setup the necessary configuration for the initial node daemon.
438 This creates the nodepass file containing the shared password for
439 the cluster and also generates the SSL certificate.
442 # Create pseudo random password
443 randpass = sha.new(os.urandom(64)).hexdigest()
444 # and write it into sstore
445 ss.SetKey(ss.SS_NODED_PASS, randpass)
447 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
448 "-days", str(365*5), "-nodes", "-x509",
449 "-keyout", constants.SSL_CERT_FILE,
450 "-out", constants.SSL_CERT_FILE, "-batch"])
452 raise errors.OpExecError("could not generate server ssl cert, command"
453 " %s had exitcode %s and error message %s" %
454 (result.cmd, result.exit_code, result.output))
456 os.chmod(constants.SSL_CERT_FILE, 0400)
458 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
461 raise errors.OpExecError("Could not start the node daemon, command %s"
462 " had exitcode %s and error %s" %
463 (result.cmd, result.exit_code, result.output))
466 def _CheckInstanceBridgesExist(instance):
467 """Check that the brigdes needed by an instance exist.
470 # check bridges existance
471 brlist = [nic.bridge for nic in instance.nics]
472 if not rpc.call_bridges_exist(instance.primary_node, brlist):
473 raise errors.OpPrereqError("one or more target bridges %s does not"
474 " exist on destination node '%s'" %
475 (brlist, instance.primary_node))
478 class LUInitCluster(LogicalUnit):
479 """Initialise the cluster.
482 HPATH = "cluster-init"
483 HTYPE = constants.HTYPE_CLUSTER
484 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
485 "def_bridge", "master_netdev"]
488 def BuildHooksEnv(self):
491 Notes: Since we don't require a cluster, we must manually add
492 ourselves in the post-run node list.
495 env = {"OP_TARGET": self.op.cluster_name}
496 return env, [], [self.hostname.name]
498 def CheckPrereq(self):
499 """Verify that the passed name is a valid one.
502 if config.ConfigWriter.IsCluster():
503 raise errors.OpPrereqError("Cluster is already initialised")
505 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
506 if not os.path.exists(constants.VNC_PASSWORD_FILE):
507 raise errors.OpPrereqError("Please prepare the cluster VNC"
509 constants.VNC_PASSWORD_FILE)
511 self.hostname = hostname = utils.HostInfo()
513 if hostname.ip.startswith("127."):
514 raise errors.OpPrereqError("This host's IP resolves to the private"
515 " range (%s). Please fix DNS or %s." %
516 (hostname.ip, constants.ETC_HOSTS))
518 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
519 source=constants.LOCALHOST_IP_ADDRESS):
520 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521 " to %s,\nbut this ip address does not"
522 " belong to this host."
523 " Aborting." % hostname.ip)
525 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
527 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
529 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
531 secondary_ip = getattr(self.op, "secondary_ip", None)
532 if secondary_ip and not utils.IsValidIP(secondary_ip):
533 raise errors.OpPrereqError("Invalid secondary ip given")
535 secondary_ip != hostname.ip and
536 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
537 source=constants.LOCALHOST_IP_ADDRESS))):
538 raise errors.OpPrereqError("You gave %s as secondary IP,"
539 " but it does not belong to this host." %
541 self.secondary_ip = secondary_ip
543 # checks presence of the volume group given
544 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
547 raise errors.OpPrereqError("Error: %s" % vgstatus)
549 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
551 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
554 if self.op.hypervisor_type not in constants.HYPER_TYPES:
555 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
556 self.op.hypervisor_type)
558 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
560 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
561 (self.op.master_netdev,
562 result.output.strip()))
564 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
565 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
566 raise errors.OpPrereqError("Init.d script '%s' missing or not"
567 " executable." % constants.NODE_INITD_SCRIPT)
569 def Exec(self, feedback_fn):
570 """Initialize the cluster.
573 clustername = self.clustername
574 hostname = self.hostname
576 # set up the simple store
577 self.sstore = ss = ssconf.SimpleStore()
578 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
579 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
580 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
581 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
582 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
584 # set up the inter-node password and certificate
585 _InitGanetiServerSetup(ss)
587 # start the master ip
588 rpc.call_node_start_master(hostname.name)
590 # set up ssh config and /etc/hosts
591 f = open(constants.SSH_HOST_RSA_PUB, 'r')
596 sshkey = sshline.split(" ")[1]
598 _AddHostToEtcHosts(hostname.name)
600 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
602 _InitSSHSetup(hostname.name)
604 # init of cluster config file
605 self.cfg = cfgw = config.ConfigWriter()
606 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
607 sshkey, self.op.mac_prefix,
608 self.op.vg_name, self.op.def_bridge)
611 class LUDestroyCluster(NoHooksLU):
612 """Logical unit for destroying the cluster.
617 def CheckPrereq(self):
618 """Check prerequisites.
620 This checks whether the cluster is empty.
622 Any errors are signalled by raising errors.OpPrereqError.
625 master = self.sstore.GetMasterNode()
627 nodelist = self.cfg.GetNodeList()
628 if len(nodelist) != 1 or nodelist[0] != master:
629 raise errors.OpPrereqError("There are still %d node(s) in"
630 " this cluster." % (len(nodelist) - 1))
631 instancelist = self.cfg.GetInstanceList()
633 raise errors.OpPrereqError("There are still %d instance(s) in"
634 " this cluster." % len(instancelist))
636 def Exec(self, feedback_fn):
637 """Destroys the cluster.
640 master = self.sstore.GetMasterNode()
641 if not rpc.call_node_stop_master(master):
642 raise errors.OpExecError("Could not disable the master role")
643 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
644 utils.CreateBackup(priv_key)
645 utils.CreateBackup(pub_key)
646 rpc.call_node_leave_cluster(master)
649 class LUVerifyCluster(NoHooksLU):
650 """Verifies the cluster status.
653 _OP_REQP = ["skip_checks"]
655 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
656 remote_version, feedback_fn):
657 """Run multiple tests against a node.
660 - compares ganeti version
661 - checks vg existance and size > 20G
662 - checks config file checksum
663 - checks ssh to other nodes
666 node: name of the node to check
667 file_list: required list of files
668 local_cksum: dictionary of local files and their checksums
671 # compares ganeti version
672 local_version = constants.PROTOCOL_VERSION
673 if not remote_version:
674 feedback_fn(" - ERROR: connection to %s failed" % (node))
677 if local_version != remote_version:
678 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
679 (local_version, node, remote_version))
682 # checks vg existance and size > 20G
686 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
690 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
692 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
695 # checks config file checksum
698 if 'filelist' not in node_result:
700 feedback_fn(" - ERROR: node hasn't returned file checksum data")
702 remote_cksum = node_result['filelist']
703 for file_name in file_list:
704 if file_name not in remote_cksum:
706 feedback_fn(" - ERROR: file '%s' missing" % file_name)
707 elif remote_cksum[file_name] != local_cksum[file_name]:
709 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
711 if 'nodelist' not in node_result:
713 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
715 if node_result['nodelist']:
717 for node in node_result['nodelist']:
718 feedback_fn(" - ERROR: communication with node '%s': %s" %
719 (node, node_result['nodelist'][node]))
720 hyp_result = node_result.get('hypervisor', None)
721 if hyp_result is not None:
722 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
725 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
726 node_instance, feedback_fn):
727 """Verify an instance.
729 This function checks to see if the required block devices are
730 available on the instance's node.
735 node_current = instanceconfig.primary_node
738 instanceconfig.MapLVsByNode(node_vol_should)
740 for node in node_vol_should:
741 for volume in node_vol_should[node]:
742 if node not in node_vol_is or volume not in node_vol_is[node]:
743 feedback_fn(" - ERROR: volume %s missing on node %s" %
747 if not instanceconfig.status == 'down':
748 if (node_current not in node_instance or
749 not instance in node_instance[node_current]):
750 feedback_fn(" - ERROR: instance %s not running on node %s" %
751 (instance, node_current))
754 for node in node_instance:
755 if (not node == node_current):
756 if instance in node_instance[node]:
757 feedback_fn(" - ERROR: instance %s should not run on node %s" %
763 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764 """Verify if there are any unknown volumes in the cluster.
766 The .os, .swap and backup volumes are ignored. All other volumes are
772 for node in node_vol_is:
773 for volume in node_vol_is[node]:
774 if node not in node_vol_should or volume not in node_vol_should[node]:
775 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
780 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781 """Verify the list of running instances.
783 This checks what instances are running but unknown to the cluster.
787 for node in node_instance:
788 for runninginstance in node_instance[node]:
789 if runninginstance not in instancelist:
790 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
791 (runninginstance, node))
795 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
796 """Verify N+1 Memory Resilience.
798 Check that if one single node dies we can still start all the instances it
804 for node, nodeinfo in node_info.iteritems():
805 # This code checks that every node which is now listed as secondary has
806 # enough memory to host all instances it is supposed to should a single
807 # other node in the cluster fail.
808 # FIXME: not ready for failover to an arbitrary node
809 # FIXME: does not support file-backed instances
810 # WARNING: we currently take into account down instances as well as up
811 # ones, considering that even if they're down someone might want to start
812 # them even in the event of a node failure.
813 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
815 for instance in instances:
816 needed_mem += instance_cfg[instance].memory
817 if nodeinfo['mfree'] < needed_mem:
818 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
819 " failovers should node %s fail" % (node, prinode))
823 def CheckPrereq(self):
824 """Check prerequisites.
826 Transform the list of checks we're going to skip into a set and check that
827 all its members are valid.
830 self.skip_set = frozenset(self.op.skip_checks)
831 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
832 raise errors.OpPrereqError("Invalid checks to be skipped specified")
834 def Exec(self, feedback_fn):
835 """Verify integrity of cluster, performing various test on nodes.
839 feedback_fn("* Verifying global settings")
840 for msg in self.cfg.VerifyConfig():
841 feedback_fn(" - ERROR: %s" % msg)
843 vg_name = self.cfg.GetVGName()
844 nodelist = utils.NiceSort(self.cfg.GetNodeList())
845 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
846 i_non_redundant = [] # Non redundant instances
852 # FIXME: verify OS list
854 file_names = list(self.sstore.GetFileList())
855 file_names.append(constants.SSL_CERT_FILE)
856 file_names.append(constants.CLUSTER_CONF_FILE)
857 local_checksums = utils.FingerprintFiles(file_names)
859 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
860 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
861 all_instanceinfo = rpc.call_instance_list(nodelist)
862 all_vglist = rpc.call_vg_list(nodelist)
863 node_verify_param = {
864 'filelist': file_names,
865 'nodelist': nodelist,
868 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
869 all_rversion = rpc.call_version(nodelist)
870 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
872 for node in nodelist:
873 feedback_fn("* Verifying node %s" % node)
874 result = self._VerifyNode(node, file_names, local_checksums,
875 all_vglist[node], all_nvinfo[node],
876 all_rversion[node], feedback_fn)
880 volumeinfo = all_volumeinfo[node]
882 if isinstance(volumeinfo, basestring):
883 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
884 (node, volumeinfo[-400:].encode('string_escape')))
886 node_volume[node] = {}
887 elif not isinstance(volumeinfo, dict):
888 feedback_fn(" - ERROR: connection to %s failed" % (node,))
892 node_volume[node] = volumeinfo
895 nodeinstance = all_instanceinfo[node]
896 if type(nodeinstance) != list:
897 feedback_fn(" - ERROR: connection to %s failed" % (node,))
901 node_instance[node] = nodeinstance
904 nodeinfo = all_ninfo[node]
905 if not isinstance(nodeinfo, dict):
906 feedback_fn(" - ERROR: connection to %s failed" % (node,))
912 "mfree": int(nodeinfo['memory_free']),
913 "dfree": int(nodeinfo['vg_free']),
916 # dictionary holding all instances this node is secondary for,
917 # grouped by their primary node. Each key is a cluster node, and each
918 # value is a list of instances which have the key as primary and the
919 # current node as secondary. this is handy to calculate N+1 memory
920 # availability if you can only failover from a primary to its
922 "sinst-by-pnode": {},
925 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
931 for instance in instancelist:
932 feedback_fn("* Verifying instance %s" % instance)
933 inst_config = self.cfg.GetInstanceInfo(instance)
934 result = self._VerifyInstance(instance, inst_config, node_volume,
935 node_instance, feedback_fn)
938 inst_config.MapLVsByNode(node_vol_should)
940 instance_cfg[instance] = inst_config
942 pnode = inst_config.primary_node
943 if pnode in node_info:
944 node_info[pnode]['pinst'].append(instance)
946 feedback_fn(" - ERROR: instance %s, connection to primary node"
947 " %s failed" % (instance, pnode))
950 # If the instance is non-redundant we cannot survive losing its primary
951 # node, so we are not N+1 compliant. On the other hand we have no disk
952 # templates with more than one secondary so that situation is not well
954 # FIXME: does not support file-backed instances
955 if len(inst_config.secondary_nodes) == 0:
956 i_non_redundant.append(instance)
957 elif len(inst_config.secondary_nodes) > 1:
958 feedback_fn(" - WARNING: multiple secondaries for instance %s"
961 for snode in inst_config.secondary_nodes:
962 if snode in node_info:
963 node_info[snode]['sinst'].append(instance)
964 if pnode not in node_info[snode]['sinst-by-pnode']:
965 node_info[snode]['sinst-by-pnode'][pnode] = []
966 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
968 feedback_fn(" - ERROR: instance %s, connection to secondary node"
969 " %s failed" % (instance, snode))
971 feedback_fn("* Verifying orphan volumes")
972 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976 feedback_fn("* Verifying remaining instances")
977 result = self._VerifyOrphanInstances(instancelist, node_instance,
981 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
982 feedback_fn("* Verifying N+1 Memory redundancy")
983 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
986 feedback_fn("* Other Notes")
988 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
989 % len(i_non_redundant))
994 class LUVerifyDisks(NoHooksLU):
995 """Verifies the cluster disks status.
1000 def CheckPrereq(self):
1001 """Check prerequisites.
1003 This has no prerequisites.
1008 def Exec(self, feedback_fn):
1009 """Verify integrity of cluster disks.
1012 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1014 vg_name = self.cfg.GetVGName()
1015 nodes = utils.NiceSort(self.cfg.GetNodeList())
1016 instances = [self.cfg.GetInstanceInfo(name)
1017 for name in self.cfg.GetInstanceList()]
1020 for inst in instances:
1022 if (inst.status != "up" or
1023 inst.disk_template not in constants.DTS_NET_MIRROR):
1025 inst.MapLVsByNode(inst_lvs)
1026 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1027 for node, vol_list in inst_lvs.iteritems():
1028 for vol in vol_list:
1029 nv_dict[(node, vol)] = inst
1034 node_lvs = rpc.call_volume_list(nodes, vg_name)
1039 lvs = node_lvs[node]
1041 if isinstance(lvs, basestring):
1042 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1043 res_nlvm[node] = lvs
1044 elif not isinstance(lvs, dict):
1045 logger.Info("connection to node %s failed or invalid data returned" %
1047 res_nodes.append(node)
1050 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1051 inst = nv_dict.pop((node, lv_name), None)
1052 if (not lv_online and inst is not None
1053 and inst.name not in res_instances):
1054 res_instances.append(inst.name)
1056 # any leftover items in nv_dict are missing LVs, let's arrange the
1058 for key, inst in nv_dict.iteritems():
1059 if inst.name not in res_missing:
1060 res_missing[inst.name] = []
1061 res_missing[inst.name].append(key)
1066 class LURenameCluster(LogicalUnit):
1067 """Rename the cluster.
1070 HPATH = "cluster-rename"
1071 HTYPE = constants.HTYPE_CLUSTER
1074 def BuildHooksEnv(self):
1079 "OP_TARGET": self.sstore.GetClusterName(),
1080 "NEW_NAME": self.op.name,
1082 mn = self.sstore.GetMasterNode()
1083 return env, [mn], [mn]
1085 def CheckPrereq(self):
1086 """Verify that the passed name is a valid one.
1089 hostname = utils.HostInfo(self.op.name)
1091 new_name = hostname.name
1092 self.ip = new_ip = hostname.ip
1093 old_name = self.sstore.GetClusterName()
1094 old_ip = self.sstore.GetMasterIP()
1095 if new_name == old_name and new_ip == old_ip:
1096 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1097 " cluster has changed")
1098 if new_ip != old_ip:
1099 result = utils.RunCmd(["fping", "-q", new_ip])
1100 if not result.failed:
1101 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102 " reachable on the network. Aborting." %
1105 self.op.name = new_name
1107 def Exec(self, feedback_fn):
1108 """Rename the cluster.
1111 clustername = self.op.name
1115 # shutdown the master IP
1116 master = ss.GetMasterNode()
1117 if not rpc.call_node_stop_master(master):
1118 raise errors.OpExecError("Could not disable the master role")
1122 ss.SetKey(ss.SS_MASTER_IP, ip)
1123 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1125 # Distribute updated ss config to all nodes
1126 myself = self.cfg.GetNodeInfo(master)
1127 dist_nodes = self.cfg.GetNodeList()
1128 if myself.name in dist_nodes:
1129 dist_nodes.remove(myself.name)
1131 logger.Debug("Copying updated ssconf data to all nodes")
1132 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133 fname = ss.KeyToFilename(keyname)
1134 result = rpc.call_upload_file(dist_nodes, fname)
1135 for to_node in dist_nodes:
1136 if not result[to_node]:
1137 logger.Error("copy of file %s to node %s failed" %
1140 if not rpc.call_node_start_master(master):
1141 logger.Error("Could not re-enable the master role on the master,"
1142 " please restart manually.")
1145 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1146 """Sleep and poll for an instance's disk to sync.
1149 if not instance.disks:
1153 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1155 node = instance.primary_node
1157 for dev in instance.disks:
1158 cfgw.SetDiskID(dev, node)
1164 cumul_degraded = False
1165 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1167 proc.LogWarning("Can't get any data from node %s" % node)
1170 raise errors.RemoteError("Can't contact node %s for mirror data,"
1171 " aborting." % node)
1175 for i in range(len(rstats)):
1178 proc.LogWarning("Can't compute data for node %s/%s" %
1179 (node, instance.disks[i].iv_name))
1181 # we ignore the ldisk parameter
1182 perc_done, est_time, is_degraded, _ = mstat
1183 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1184 if perc_done is not None:
1186 if est_time is not None:
1187 rem_time = "%d estimated seconds remaining" % est_time
1190 rem_time = "no time estimate"
1191 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1192 (instance.disks[i].iv_name, perc_done, rem_time))
1199 time.sleep(min(60, max_time))
1205 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1206 return not cumul_degraded
1209 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1210 """Check that mirrors are not degraded.
1212 The ldisk parameter, if True, will change the test from the
1213 is_degraded attribute (which represents overall non-ok status for
1214 the device(s)) to the ldisk (representing the local storage status).
1217 cfgw.SetDiskID(dev, node)
1224 if on_primary or dev.AssembleOnSecondary():
1225 rstats = rpc.call_blockdev_find(node, dev)
1227 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1230 result = result and (not rstats[idx])
1232 for child in dev.children:
1233 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1238 class LUDiagnoseOS(NoHooksLU):
1239 """Logical unit for OS diagnose/query.
1244 def CheckPrereq(self):
1245 """Check prerequisites.
1247 This always succeeds, since this is a pure query LU.
1252 def Exec(self, feedback_fn):
1253 """Compute the list of OSes.
1256 node_list = self.cfg.GetNodeList()
1257 node_data = rpc.call_os_diagnose(node_list)
1258 if node_data == False:
1259 raise errors.OpExecError("Can't gather the list of OSes")
1263 class LURemoveNode(LogicalUnit):
1264 """Logical unit for removing a node.
1267 HPATH = "node-remove"
1268 HTYPE = constants.HTYPE_NODE
1269 _OP_REQP = ["node_name"]
1271 def BuildHooksEnv(self):
1274 This doesn't run on the target node in the pre phase as a failed
1275 node would not allows itself to run.
1279 "OP_TARGET": self.op.node_name,
1280 "NODE_NAME": self.op.node_name,
1282 all_nodes = self.cfg.GetNodeList()
1283 all_nodes.remove(self.op.node_name)
1284 return env, all_nodes, all_nodes
1286 def CheckPrereq(self):
1287 """Check prerequisites.
1290 - the node exists in the configuration
1291 - it does not have primary or secondary instances
1292 - it's not the master
1294 Any errors are signalled by raising errors.OpPrereqError.
1297 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1299 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1301 instance_list = self.cfg.GetInstanceList()
1303 masternode = self.sstore.GetMasterNode()
1304 if node.name == masternode:
1305 raise errors.OpPrereqError("Node is the master node,"
1306 " you need to failover first.")
1308 for instance_name in instance_list:
1309 instance = self.cfg.GetInstanceInfo(instance_name)
1310 if node.name == instance.primary_node:
1311 raise errors.OpPrereqError("Instance %s still running on the node,"
1312 " please remove first." % instance_name)
1313 if node.name in instance.secondary_nodes:
1314 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1315 " please remove first." % instance_name)
1316 self.op.node_name = node.name
1319 def Exec(self, feedback_fn):
1320 """Removes the node from the cluster.
1324 logger.Info("stopping the node daemon and removing configs from node %s" %
1327 rpc.call_node_leave_cluster(node.name)
1329 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1331 logger.Info("Removing node %s from config" % node.name)
1333 self.cfg.RemoveNode(node.name)
1335 _RemoveHostFromEtcHosts(node.name)
1338 class LUQueryNodes(NoHooksLU):
1339 """Logical unit for querying nodes.
1342 _OP_REQP = ["output_fields", "names"]
1344 def CheckPrereq(self):
1345 """Check prerequisites.
1347 This checks that the fields required are valid output fields.
1350 self.dynamic_fields = frozenset(["dtotal", "dfree",
1351 "mtotal", "mnode", "mfree",
1354 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1355 "pinst_list", "sinst_list",
1357 dynamic=self.dynamic_fields,
1358 selected=self.op.output_fields)
1360 self.wanted = _GetWantedNodes(self, self.op.names)
1362 def Exec(self, feedback_fn):
1363 """Computes the list of nodes and their attributes.
1366 nodenames = self.wanted
1367 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1369 # begin data gathering
1371 if self.dynamic_fields.intersection(self.op.output_fields):
1373 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1374 for name in nodenames:
1375 nodeinfo = node_data.get(name, None)
1378 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1379 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1380 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1381 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1382 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1383 "bootid": nodeinfo['bootid'],
1386 live_data[name] = {}
1388 live_data = dict.fromkeys(nodenames, {})
1390 node_to_primary = dict([(name, set()) for name in nodenames])
1391 node_to_secondary = dict([(name, set()) for name in nodenames])
1393 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1394 "sinst_cnt", "sinst_list"))
1395 if inst_fields & frozenset(self.op.output_fields):
1396 instancelist = self.cfg.GetInstanceList()
1398 for instance_name in instancelist:
1399 inst = self.cfg.GetInstanceInfo(instance_name)
1400 if inst.primary_node in node_to_primary:
1401 node_to_primary[inst.primary_node].add(inst.name)
1402 for secnode in inst.secondary_nodes:
1403 if secnode in node_to_secondary:
1404 node_to_secondary[secnode].add(inst.name)
1406 # end data gathering
1409 for node in nodelist:
1411 for field in self.op.output_fields:
1414 elif field == "pinst_list":
1415 val = list(node_to_primary[node.name])
1416 elif field == "sinst_list":
1417 val = list(node_to_secondary[node.name])
1418 elif field == "pinst_cnt":
1419 val = len(node_to_primary[node.name])
1420 elif field == "sinst_cnt":
1421 val = len(node_to_secondary[node.name])
1422 elif field == "pip":
1423 val = node.primary_ip
1424 elif field == "sip":
1425 val = node.secondary_ip
1426 elif field in self.dynamic_fields:
1427 val = live_data[node.name].get(field, None)
1429 raise errors.ParameterError(field)
1430 node_output.append(val)
1431 output.append(node_output)
1436 class LUQueryNodeVolumes(NoHooksLU):
1437 """Logical unit for getting volumes on node(s).
1440 _OP_REQP = ["nodes", "output_fields"]
1442 def CheckPrereq(self):
1443 """Check prerequisites.
1445 This checks that the fields required are valid output fields.
1448 self.nodes = _GetWantedNodes(self, self.op.nodes)
1450 _CheckOutputFields(static=["node"],
1451 dynamic=["phys", "vg", "name", "size", "instance"],
1452 selected=self.op.output_fields)
1455 def Exec(self, feedback_fn):
1456 """Computes the list of nodes and their attributes.
1459 nodenames = self.nodes
1460 volumes = rpc.call_node_volumes(nodenames)
1462 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1463 in self.cfg.GetInstanceList()]
1465 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1468 for node in nodenames:
1469 if node not in volumes or not volumes[node]:
1472 node_vols = volumes[node][:]
1473 node_vols.sort(key=lambda vol: vol['dev'])
1475 for vol in node_vols:
1477 for field in self.op.output_fields:
1480 elif field == "phys":
1484 elif field == "name":
1486 elif field == "size":
1487 val = int(float(vol['size']))
1488 elif field == "instance":
1490 if node not in lv_by_node[inst]:
1492 if vol['name'] in lv_by_node[inst][node]:
1498 raise errors.ParameterError(field)
1499 node_output.append(str(val))
1501 output.append(node_output)
1506 class LUAddNode(LogicalUnit):
1507 """Logical unit for adding node to the cluster.
1511 HTYPE = constants.HTYPE_NODE
1512 _OP_REQP = ["node_name"]
1514 def BuildHooksEnv(self):
1517 This will run on all nodes before, and on all nodes + the new node after.
1521 "OP_TARGET": self.op.node_name,
1522 "NODE_NAME": self.op.node_name,
1523 "NODE_PIP": self.op.primary_ip,
1524 "NODE_SIP": self.op.secondary_ip,
1526 nodes_0 = self.cfg.GetNodeList()
1527 nodes_1 = nodes_0 + [self.op.node_name, ]
1528 return env, nodes_0, nodes_1
1530 def CheckPrereq(self):
1531 """Check prerequisites.
1534 - the new node is not already in the config
1536 - its parameters (single/dual homed) matches the cluster
1538 Any errors are signalled by raising errors.OpPrereqError.
1541 node_name = self.op.node_name
1544 dns_data = utils.HostInfo(node_name)
1546 node = dns_data.name
1547 primary_ip = self.op.primary_ip = dns_data.ip
1548 secondary_ip = getattr(self.op, "secondary_ip", None)
1549 if secondary_ip is None:
1550 secondary_ip = primary_ip
1551 if not utils.IsValidIP(secondary_ip):
1552 raise errors.OpPrereqError("Invalid secondary IP given")
1553 self.op.secondary_ip = secondary_ip
1554 node_list = cfg.GetNodeList()
1555 if node in node_list:
1556 raise errors.OpPrereqError("Node %s is already in the configuration"
1559 for existing_node_name in node_list:
1560 existing_node = cfg.GetNodeInfo(existing_node_name)
1561 if (existing_node.primary_ip == primary_ip or
1562 existing_node.secondary_ip == primary_ip or
1563 existing_node.primary_ip == secondary_ip or
1564 existing_node.secondary_ip == secondary_ip):
1565 raise errors.OpPrereqError("New node ip address(es) conflict with"
1566 " existing node %s" % existing_node.name)
1568 # check that the type of the node (single versus dual homed) is the
1569 # same as for the master
1570 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1571 master_singlehomed = myself.secondary_ip == myself.primary_ip
1572 newbie_singlehomed = secondary_ip == primary_ip
1573 if master_singlehomed != newbie_singlehomed:
1574 if master_singlehomed:
1575 raise errors.OpPrereqError("The master has no private ip but the"
1576 " new node has one")
1578 raise errors.OpPrereqError("The master has a private ip but the"
1579 " new node doesn't have one")
1581 # checks reachablity
1582 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1583 raise errors.OpPrereqError("Node not reachable by ping")
1585 if not newbie_singlehomed:
1586 # check reachability from my secondary ip to newbie's secondary ip
1587 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1588 source=myself.secondary_ip):
1589 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1590 " based ping to noded port")
1592 self.new_node = objects.Node(name=node,
1593 primary_ip=primary_ip,
1594 secondary_ip=secondary_ip)
1596 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1597 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1598 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1599 constants.VNC_PASSWORD_FILE)
1601 def Exec(self, feedback_fn):
1602 """Adds the new node to the cluster.
1605 new_node = self.new_node
1606 node = new_node.name
1608 # set up inter-node password and certificate and restarts the node daemon
1609 gntpass = self.sstore.GetNodeDaemonPassword()
1610 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1611 raise errors.OpExecError("ganeti password corruption detected")
1612 f = open(constants.SSL_CERT_FILE)
1614 gntpem = f.read(8192)
1617 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1618 # so we use this to detect an invalid certificate; as long as the
1619 # cert doesn't contain this, the here-document will be correctly
1620 # parsed by the shell sequence below
1621 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1622 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1623 if not gntpem.endswith("\n"):
1624 raise errors.OpExecError("PEM must end with newline")
1625 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1627 # and then connect with ssh to set password and start ganeti-noded
1628 # note that all the below variables are sanitized at this point,
1629 # either by being constants or by the checks above
1631 mycommand = ("umask 077 && "
1632 "echo '%s' > '%s' && "
1633 "cat > '%s' << '!EOF.' && \n"
1634 "%s!EOF.\n%s restart" %
1635 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1636 constants.SSL_CERT_FILE, gntpem,
1637 constants.NODE_INITD_SCRIPT))
1639 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1641 raise errors.OpExecError("Remote command on node %s, error: %s,"
1643 (node, result.fail_reason, result.output))
1645 # check connectivity
1648 result = rpc.call_version([node])[node]
1650 if constants.PROTOCOL_VERSION == result:
1651 logger.Info("communication to node %s fine, sw version %s match" %
1654 raise errors.OpExecError("Version mismatch master version %s,"
1655 " node version %s" %
1656 (constants.PROTOCOL_VERSION, result))
1658 raise errors.OpExecError("Cannot get version from the new node")
1661 logger.Info("copy ssh key to node %s" % node)
1662 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1664 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1665 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1671 keyarray.append(f.read())
1675 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1676 keyarray[3], keyarray[4], keyarray[5])
1679 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1681 # Add node to our /etc/hosts, and add key to known_hosts
1682 _AddHostToEtcHosts(new_node.name)
1684 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1685 self.cfg.GetHostKey())
1687 if new_node.secondary_ip != new_node.primary_ip:
1688 if not rpc.call_node_tcp_ping(new_node.name,
1689 constants.LOCALHOST_IP_ADDRESS,
1690 new_node.secondary_ip,
1691 constants.DEFAULT_NODED_PORT,
1693 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1694 " you gave (%s). Please fix and re-run this"
1695 " command." % new_node.secondary_ip)
1697 success, msg = ssh.VerifyNodeHostname(node)
1699 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1700 " than the one the resolver gives: %s."
1701 " Please fix and re-run this command." %
1704 # Distribute updated /etc/hosts and known_hosts to all nodes,
1705 # including the node just added
1706 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1707 dist_nodes = self.cfg.GetNodeList() + [node]
1708 if myself.name in dist_nodes:
1709 dist_nodes.remove(myself.name)
1711 logger.Debug("Copying hosts and known_hosts to all nodes")
1712 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1713 result = rpc.call_upload_file(dist_nodes, fname)
1714 for to_node in dist_nodes:
1715 if not result[to_node]:
1716 logger.Error("copy of file %s to node %s failed" %
1719 to_copy = ss.GetFileList()
1720 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1721 to_copy.append(constants.VNC_PASSWORD_FILE)
1722 for fname in to_copy:
1723 if not ssh.CopyFileToNode(node, fname):
1724 logger.Error("could not copy file %s to node %s" % (fname, node))
1726 logger.Info("adding node %s to cluster.conf" % node)
1727 self.cfg.AddNode(new_node)
1730 class LUMasterFailover(LogicalUnit):
1731 """Failover the master node to the current node.
1733 This is a special LU in that it must run on a non-master node.
1736 HPATH = "master-failover"
1737 HTYPE = constants.HTYPE_CLUSTER
1741 def BuildHooksEnv(self):
1744 This will run on the new master only in the pre phase, and on all
1745 the nodes in the post phase.
1749 "OP_TARGET": self.new_master,
1750 "NEW_MASTER": self.new_master,
1751 "OLD_MASTER": self.old_master,
1753 return env, [self.new_master], self.cfg.GetNodeList()
1755 def CheckPrereq(self):
1756 """Check prerequisites.
1758 This checks that we are not already the master.
1761 self.new_master = utils.HostInfo().name
1762 self.old_master = self.sstore.GetMasterNode()
1764 if self.old_master == self.new_master:
1765 raise errors.OpPrereqError("This commands must be run on the node"
1766 " where you want the new master to be."
1767 " %s is already the master" %
1770 def Exec(self, feedback_fn):
1771 """Failover the master node.
1773 This command, when run on a non-master node, will cause the current
1774 master to cease being master, and the non-master to become new
1778 #TODO: do not rely on gethostname returning the FQDN
1779 logger.Info("setting master to %s, old master: %s" %
1780 (self.new_master, self.old_master))
1782 if not rpc.call_node_stop_master(self.old_master):
1783 logger.Error("could disable the master role on the old master"
1784 " %s, please disable manually" % self.old_master)
1787 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1788 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1789 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1790 logger.Error("could not distribute the new simple store master file"
1791 " to the other nodes, please check.")
1793 if not rpc.call_node_start_master(self.new_master):
1794 logger.Error("could not start the master role on the new master"
1795 " %s, please check" % self.new_master)
1796 feedback_fn("Error in activating the master IP on the new master,"
1797 " please fix manually.")
1801 class LUQueryClusterInfo(NoHooksLU):
1802 """Query cluster configuration.
1808 def CheckPrereq(self):
1809 """No prerequsites needed for this LU.
1814 def Exec(self, feedback_fn):
1815 """Return cluster config.
1819 "name": self.sstore.GetClusterName(),
1820 "software_version": constants.RELEASE_VERSION,
1821 "protocol_version": constants.PROTOCOL_VERSION,
1822 "config_version": constants.CONFIG_VERSION,
1823 "os_api_version": constants.OS_API_VERSION,
1824 "export_version": constants.EXPORT_VERSION,
1825 "master": self.sstore.GetMasterNode(),
1826 "architecture": (platform.architecture()[0], platform.machine()),
1832 class LUClusterCopyFile(NoHooksLU):
1833 """Copy file to cluster.
1836 _OP_REQP = ["nodes", "filename"]
1838 def CheckPrereq(self):
1839 """Check prerequisites.
1841 It should check that the named file exists and that the given list
1845 if not os.path.exists(self.op.filename):
1846 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1848 self.nodes = _GetWantedNodes(self, self.op.nodes)
1850 def Exec(self, feedback_fn):
1851 """Copy a file from master to some nodes.
1854 opts - class with options as members
1855 args - list containing a single element, the file name
1857 nodes - list containing the name of target nodes; if empty, all nodes
1860 filename = self.op.filename
1862 myname = utils.HostInfo().name
1864 for node in self.nodes:
1867 if not ssh.CopyFileToNode(node, filename):
1868 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1871 class LUDumpClusterConfig(NoHooksLU):
1872 """Return a text-representation of the cluster-config.
1877 def CheckPrereq(self):
1878 """No prerequisites.
1883 def Exec(self, feedback_fn):
1884 """Dump a representation of the cluster config to the standard output.
1887 return self.cfg.DumpConfig()
1890 class LURunClusterCommand(NoHooksLU):
1891 """Run a command on some nodes.
1894 _OP_REQP = ["command", "nodes"]
1896 def CheckPrereq(self):
1897 """Check prerequisites.
1899 It checks that the given list of nodes is valid.
1902 self.nodes = _GetWantedNodes(self, self.op.nodes)
1904 def Exec(self, feedback_fn):
1905 """Run a command on some nodes.
1908 # put the master at the end of the nodes list
1909 master_node = self.sstore.GetMasterNode()
1910 if master_node in self.nodes:
1911 self.nodes.remove(master_node)
1912 self.nodes.append(master_node)
1915 for node in self.nodes:
1916 result = ssh.SSHCall(node, "root", self.op.command)
1917 data.append((node, result.output, result.exit_code))
1922 class LUActivateInstanceDisks(NoHooksLU):
1923 """Bring up an instance's disks.
1926 _OP_REQP = ["instance_name"]
1928 def CheckPrereq(self):
1929 """Check prerequisites.
1931 This checks that the instance is in the cluster.
1934 instance = self.cfg.GetInstanceInfo(
1935 self.cfg.ExpandInstanceName(self.op.instance_name))
1936 if instance is None:
1937 raise errors.OpPrereqError("Instance '%s' not known" %
1938 self.op.instance_name)
1939 self.instance = instance
1942 def Exec(self, feedback_fn):
1943 """Activate the disks.
1946 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1948 raise errors.OpExecError("Cannot activate block devices")
1953 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1954 """Prepare the block devices for an instance.
1956 This sets up the block devices on all nodes.
1959 instance: a ganeti.objects.Instance object
1960 ignore_secondaries: if true, errors on secondary nodes won't result
1961 in an error return from the function
1964 false if the operation failed
1965 list of (host, instance_visible_name, node_visible_name) if the operation
1966 suceeded with the mapping from node devices to instance devices
1970 iname = instance.name
1971 # With the two passes mechanism we try to reduce the window of
1972 # opportunity for the race condition of switching DRBD to primary
1973 # before handshaking occured, but we do not eliminate it
1975 # The proper fix would be to wait (with some limits) until the
1976 # connection has been made and drbd transitions from WFConnection
1977 # into any other network-connected state (Connected, SyncTarget,
1980 # 1st pass, assemble on all nodes in secondary mode
1981 for inst_disk in instance.disks:
1982 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1983 cfg.SetDiskID(node_disk, node)
1984 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1986 logger.Error("could not prepare block device %s on node %s"
1987 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1988 if not ignore_secondaries:
1991 # FIXME: race condition on drbd migration to primary
1993 # 2nd pass, do only the primary node
1994 for inst_disk in instance.disks:
1995 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1996 if node != instance.primary_node:
1998 cfg.SetDiskID(node_disk, node)
1999 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2001 logger.Error("could not prepare block device %s on node %s"
2002 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2004 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2006 # leave the disks configured for the primary node
2007 # this is a workaround that would be fixed better by
2008 # improving the logical/physical id handling
2009 for disk in instance.disks:
2010 cfg.SetDiskID(disk, instance.primary_node)
2012 return disks_ok, device_info
2015 def _StartInstanceDisks(cfg, instance, force):
2016 """Start the disks of an instance.
2019 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2020 ignore_secondaries=force)
2022 _ShutdownInstanceDisks(instance, cfg)
2023 if force is not None and not force:
2024 logger.Error("If the message above refers to a secondary node,"
2025 " you can retry the operation using '--force'.")
2026 raise errors.OpExecError("Disk consistency error")
2029 class LUDeactivateInstanceDisks(NoHooksLU):
2030 """Shutdown an instance's disks.
2033 _OP_REQP = ["instance_name"]
2035 def CheckPrereq(self):
2036 """Check prerequisites.
2038 This checks that the instance is in the cluster.
2041 instance = self.cfg.GetInstanceInfo(
2042 self.cfg.ExpandInstanceName(self.op.instance_name))
2043 if instance is None:
2044 raise errors.OpPrereqError("Instance '%s' not known" %
2045 self.op.instance_name)
2046 self.instance = instance
2048 def Exec(self, feedback_fn):
2049 """Deactivate the disks
2052 instance = self.instance
2053 ins_l = rpc.call_instance_list([instance.primary_node])
2054 ins_l = ins_l[instance.primary_node]
2055 if not type(ins_l) is list:
2056 raise errors.OpExecError("Can't contact node '%s'" %
2057 instance.primary_node)
2059 if self.instance.name in ins_l:
2060 raise errors.OpExecError("Instance is running, can't shutdown"
2063 _ShutdownInstanceDisks(instance, self.cfg)
2066 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2067 """Shutdown block devices of an instance.
2069 This does the shutdown on all nodes of the instance.
2071 If the ignore_primary is false, errors on the primary node are
2076 for disk in instance.disks:
2077 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2078 cfg.SetDiskID(top_disk, node)
2079 if not rpc.call_blockdev_shutdown(node, top_disk):
2080 logger.Error("could not shutdown block device %s on node %s" %
2081 (disk.iv_name, node))
2082 if not ignore_primary or node != instance.primary_node:
2087 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2088 """Checks if a node has enough free memory.
2090 This function check if a given node has the needed amount of free
2091 memory. In case the node has less memory or we cannot get the
2092 information from the node, this function raise an OpPrereqError
2096 - cfg: a ConfigWriter instance
2097 - node: the node name
2098 - reason: string to use in the error message
2099 - requested: the amount of memory in MiB
2102 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2103 if not nodeinfo or not isinstance(nodeinfo, dict):
2104 raise errors.OpPrereqError("Could not contact node %s for resource"
2105 " information" % (node,))
2107 free_mem = nodeinfo[node].get('memory_free')
2108 if not isinstance(free_mem, int):
2109 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2110 " was '%s'" % (node, free_mem))
2111 if requested > free_mem:
2112 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2113 " needed %s MiB, available %s MiB" %
2114 (node, reason, requested, free_mem))
2117 class LUStartupInstance(LogicalUnit):
2118 """Starts an instance.
2121 HPATH = "instance-start"
2122 HTYPE = constants.HTYPE_INSTANCE
2123 _OP_REQP = ["instance_name", "force"]
2125 def BuildHooksEnv(self):
2128 This runs on master, primary and secondary nodes of the instance.
2132 "FORCE": self.op.force,
2134 env.update(_BuildInstanceHookEnvByObject(self.instance))
2135 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2136 list(self.instance.secondary_nodes))
2139 def CheckPrereq(self):
2140 """Check prerequisites.
2142 This checks that the instance is in the cluster.
2145 instance = self.cfg.GetInstanceInfo(
2146 self.cfg.ExpandInstanceName(self.op.instance_name))
2147 if instance is None:
2148 raise errors.OpPrereqError("Instance '%s' not known" %
2149 self.op.instance_name)
2151 # check bridges existance
2152 _CheckInstanceBridgesExist(instance)
2154 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2155 "starting instance %s" % instance.name,
2158 self.instance = instance
2159 self.op.instance_name = instance.name
2161 def Exec(self, feedback_fn):
2162 """Start the instance.
2165 instance = self.instance
2166 force = self.op.force
2167 extra_args = getattr(self.op, "extra_args", "")
2169 self.cfg.MarkInstanceUp(instance.name)
2171 node_current = instance.primary_node
2173 _StartInstanceDisks(self.cfg, instance, force)
2175 if not rpc.call_instance_start(node_current, instance, extra_args):
2176 _ShutdownInstanceDisks(instance, self.cfg)
2177 raise errors.OpExecError("Could not start instance")
2180 class LURebootInstance(LogicalUnit):
2181 """Reboot an instance.
2184 HPATH = "instance-reboot"
2185 HTYPE = constants.HTYPE_INSTANCE
2186 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2188 def BuildHooksEnv(self):
2191 This runs on master, primary and secondary nodes of the instance.
2195 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2197 env.update(_BuildInstanceHookEnvByObject(self.instance))
2198 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2199 list(self.instance.secondary_nodes))
2202 def CheckPrereq(self):
2203 """Check prerequisites.
2205 This checks that the instance is in the cluster.
2208 instance = self.cfg.GetInstanceInfo(
2209 self.cfg.ExpandInstanceName(self.op.instance_name))
2210 if instance is None:
2211 raise errors.OpPrereqError("Instance '%s' not known" %
2212 self.op.instance_name)
2214 # check bridges existance
2215 _CheckInstanceBridgesExist(instance)
2217 self.instance = instance
2218 self.op.instance_name = instance.name
2220 def Exec(self, feedback_fn):
2221 """Reboot the instance.
2224 instance = self.instance
2225 ignore_secondaries = self.op.ignore_secondaries
2226 reboot_type = self.op.reboot_type
2227 extra_args = getattr(self.op, "extra_args", "")
2229 node_current = instance.primary_node
2231 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2232 constants.INSTANCE_REBOOT_HARD,
2233 constants.INSTANCE_REBOOT_FULL]:
2234 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2235 (constants.INSTANCE_REBOOT_SOFT,
2236 constants.INSTANCE_REBOOT_HARD,
2237 constants.INSTANCE_REBOOT_FULL))
2239 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2240 constants.INSTANCE_REBOOT_HARD]:
2241 if not rpc.call_instance_reboot(node_current, instance,
2242 reboot_type, extra_args):
2243 raise errors.OpExecError("Could not reboot instance")
2245 if not rpc.call_instance_shutdown(node_current, instance):
2246 raise errors.OpExecError("could not shutdown instance for full reboot")
2247 _ShutdownInstanceDisks(instance, self.cfg)
2248 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2249 if not rpc.call_instance_start(node_current, instance, extra_args):
2250 _ShutdownInstanceDisks(instance, self.cfg)
2251 raise errors.OpExecError("Could not start instance for full reboot")
2253 self.cfg.MarkInstanceUp(instance.name)
2256 class LUShutdownInstance(LogicalUnit):
2257 """Shutdown an instance.
2260 HPATH = "instance-stop"
2261 HTYPE = constants.HTYPE_INSTANCE
2262 _OP_REQP = ["instance_name"]
2264 def BuildHooksEnv(self):
2267 This runs on master, primary and secondary nodes of the instance.
2270 env = _BuildInstanceHookEnvByObject(self.instance)
2271 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2272 list(self.instance.secondary_nodes))
2275 def CheckPrereq(self):
2276 """Check prerequisites.
2278 This checks that the instance is in the cluster.
2281 instance = self.cfg.GetInstanceInfo(
2282 self.cfg.ExpandInstanceName(self.op.instance_name))
2283 if instance is None:
2284 raise errors.OpPrereqError("Instance '%s' not known" %
2285 self.op.instance_name)
2286 self.instance = instance
2288 def Exec(self, feedback_fn):
2289 """Shutdown the instance.
2292 instance = self.instance
2293 node_current = instance.primary_node
2294 self.cfg.MarkInstanceDown(instance.name)
2295 if not rpc.call_instance_shutdown(node_current, instance):
2296 logger.Error("could not shutdown instance")
2298 _ShutdownInstanceDisks(instance, self.cfg)
2301 class LUReinstallInstance(LogicalUnit):
2302 """Reinstall an instance.
2305 HPATH = "instance-reinstall"
2306 HTYPE = constants.HTYPE_INSTANCE
2307 _OP_REQP = ["instance_name"]
2309 def BuildHooksEnv(self):
2312 This runs on master, primary and secondary nodes of the instance.
2315 env = _BuildInstanceHookEnvByObject(self.instance)
2316 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2317 list(self.instance.secondary_nodes))
2320 def CheckPrereq(self):
2321 """Check prerequisites.
2323 This checks that the instance is in the cluster and is not running.
2326 instance = self.cfg.GetInstanceInfo(
2327 self.cfg.ExpandInstanceName(self.op.instance_name))
2328 if instance is None:
2329 raise errors.OpPrereqError("Instance '%s' not known" %
2330 self.op.instance_name)
2331 if instance.disk_template == constants.DT_DISKLESS:
2332 raise errors.OpPrereqError("Instance '%s' has no disks" %
2333 self.op.instance_name)
2334 if instance.status != "down":
2335 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2336 self.op.instance_name)
2337 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2339 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2340 (self.op.instance_name,
2341 instance.primary_node))
2343 self.op.os_type = getattr(self.op, "os_type", None)
2344 if self.op.os_type is not None:
2346 pnode = self.cfg.GetNodeInfo(
2347 self.cfg.ExpandNodeName(instance.primary_node))
2349 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2351 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2353 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2354 " primary node" % self.op.os_type)
2356 self.instance = instance
2358 def Exec(self, feedback_fn):
2359 """Reinstall the instance.
2362 inst = self.instance
2364 if self.op.os_type is not None:
2365 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2366 inst.os = self.op.os_type
2367 self.cfg.AddInstance(inst)
2369 _StartInstanceDisks(self.cfg, inst, None)
2371 feedback_fn("Running the instance OS create scripts...")
2372 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2373 raise errors.OpExecError("Could not install OS for instance %s"
2375 (inst.name, inst.primary_node))
2377 _ShutdownInstanceDisks(inst, self.cfg)
2380 class LURenameInstance(LogicalUnit):
2381 """Rename an instance.
2384 HPATH = "instance-rename"
2385 HTYPE = constants.HTYPE_INSTANCE
2386 _OP_REQP = ["instance_name", "new_name"]
2388 def BuildHooksEnv(self):
2391 This runs on master, primary and secondary nodes of the instance.
2394 env = _BuildInstanceHookEnvByObject(self.instance)
2395 env["INSTANCE_NEW_NAME"] = self.op.new_name
2396 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2397 list(self.instance.secondary_nodes))
2400 def CheckPrereq(self):
2401 """Check prerequisites.
2403 This checks that the instance is in the cluster and is not running.
2406 instance = self.cfg.GetInstanceInfo(
2407 self.cfg.ExpandInstanceName(self.op.instance_name))
2408 if instance is None:
2409 raise errors.OpPrereqError("Instance '%s' not known" %
2410 self.op.instance_name)
2411 if instance.status != "down":
2412 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2413 self.op.instance_name)
2414 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2416 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2417 (self.op.instance_name,
2418 instance.primary_node))
2419 self.instance = instance
2421 # new name verification
2422 name_info = utils.HostInfo(self.op.new_name)
2424 self.op.new_name = new_name = name_info.name
2425 instance_list = self.cfg.GetInstanceList()
2426 if new_name in instance_list:
2427 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2430 if not getattr(self.op, "ignore_ip", False):
2431 command = ["fping", "-q", name_info.ip]
2432 result = utils.RunCmd(command)
2433 if not result.failed:
2434 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2435 (name_info.ip, new_name))
2438 def Exec(self, feedback_fn):
2439 """Reinstall the instance.
2442 inst = self.instance
2443 old_name = inst.name
2445 self.cfg.RenameInstance(inst.name, self.op.new_name)
2447 # re-read the instance from the configuration after rename
2448 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2450 _StartInstanceDisks(self.cfg, inst, None)
2452 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2454 msg = ("Could run OS rename script for instance %s on node %s (but the"
2455 " instance has been renamed in Ganeti)" %
2456 (inst.name, inst.primary_node))
2459 _ShutdownInstanceDisks(inst, self.cfg)
2462 class LURemoveInstance(LogicalUnit):
2463 """Remove an instance.
2466 HPATH = "instance-remove"
2467 HTYPE = constants.HTYPE_INSTANCE
2468 _OP_REQP = ["instance_name"]
2470 def BuildHooksEnv(self):
2473 This runs on master, primary and secondary nodes of the instance.
2476 env = _BuildInstanceHookEnvByObject(self.instance)
2477 nl = [self.sstore.GetMasterNode()]
2480 def CheckPrereq(self):
2481 """Check prerequisites.
2483 This checks that the instance is in the cluster.
2486 instance = self.cfg.GetInstanceInfo(
2487 self.cfg.ExpandInstanceName(self.op.instance_name))
2488 if instance is None:
2489 raise errors.OpPrereqError("Instance '%s' not known" %
2490 self.op.instance_name)
2491 self.instance = instance
2493 def Exec(self, feedback_fn):
2494 """Remove the instance.
2497 instance = self.instance
2498 logger.Info("shutting down instance %s on node %s" %
2499 (instance.name, instance.primary_node))
2501 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2502 if self.op.ignore_failures:
2503 feedback_fn("Warning: can't shutdown instance")
2505 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2506 (instance.name, instance.primary_node))
2508 logger.Info("removing block devices for instance %s" % instance.name)
2510 if not _RemoveDisks(instance, self.cfg):
2511 if self.op.ignore_failures:
2512 feedback_fn("Warning: can't remove instance's disks")
2514 raise errors.OpExecError("Can't remove instance's disks")
2516 logger.Info("removing instance %s out of cluster config" % instance.name)
2518 self.cfg.RemoveInstance(instance.name)
2521 class LUQueryInstances(NoHooksLU):
2522 """Logical unit for querying instances.
2525 _OP_REQP = ["output_fields", "names"]
2527 def CheckPrereq(self):
2528 """Check prerequisites.
2530 This checks that the fields required are valid output fields.
2533 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2534 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2535 "admin_state", "admin_ram",
2536 "disk_template", "ip", "mac", "bridge",
2537 "sda_size", "sdb_size", "vcpus"],
2538 dynamic=self.dynamic_fields,
2539 selected=self.op.output_fields)
2541 self.wanted = _GetWantedInstances(self, self.op.names)
2543 def Exec(self, feedback_fn):
2544 """Computes the list of nodes and their attributes.
2547 instance_names = self.wanted
2548 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2551 # begin data gathering
2553 nodes = frozenset([inst.primary_node for inst in instance_list])
2556 if self.dynamic_fields.intersection(self.op.output_fields):
2558 node_data = rpc.call_all_instances_info(nodes)
2560 result = node_data[name]
2562 live_data.update(result)
2563 elif result == False:
2564 bad_nodes.append(name)
2565 # else no instance is alive
2567 live_data = dict([(name, {}) for name in instance_names])
2569 # end data gathering
2572 for instance in instance_list:
2574 for field in self.op.output_fields:
2579 elif field == "pnode":
2580 val = instance.primary_node
2581 elif field == "snodes":
2582 val = list(instance.secondary_nodes)
2583 elif field == "admin_state":
2584 val = (instance.status != "down")
2585 elif field == "oper_state":
2586 if instance.primary_node in bad_nodes:
2589 val = bool(live_data.get(instance.name))
2590 elif field == "status":
2591 if instance.primary_node in bad_nodes:
2592 val = "ERROR_nodedown"
2594 running = bool(live_data.get(instance.name))
2596 if instance.status != "down":
2601 if instance.status != "down":
2605 elif field == "admin_ram":
2606 val = instance.memory
2607 elif field == "oper_ram":
2608 if instance.primary_node in bad_nodes:
2610 elif instance.name in live_data:
2611 val = live_data[instance.name].get("memory", "?")
2614 elif field == "disk_template":
2615 val = instance.disk_template
2617 val = instance.nics[0].ip
2618 elif field == "bridge":
2619 val = instance.nics[0].bridge
2620 elif field == "mac":
2621 val = instance.nics[0].mac
2622 elif field == "sda_size" or field == "sdb_size":
2623 disk = instance.FindDisk(field[:3])
2628 elif field == "vcpus":
2629 val = instance.vcpus
2631 raise errors.ParameterError(field)
2638 class LUFailoverInstance(LogicalUnit):
2639 """Failover an instance.
2642 HPATH = "instance-failover"
2643 HTYPE = constants.HTYPE_INSTANCE
2644 _OP_REQP = ["instance_name", "ignore_consistency"]
2646 def BuildHooksEnv(self):
2649 This runs on master, primary and secondary nodes of the instance.
2653 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2655 env.update(_BuildInstanceHookEnvByObject(self.instance))
2656 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2659 def CheckPrereq(self):
2660 """Check prerequisites.
2662 This checks that the instance is in the cluster.
2665 instance = self.cfg.GetInstanceInfo(
2666 self.cfg.ExpandInstanceName(self.op.instance_name))
2667 if instance is None:
2668 raise errors.OpPrereqError("Instance '%s' not known" %
2669 self.op.instance_name)
2671 if instance.disk_template not in constants.DTS_NET_MIRROR:
2672 raise errors.OpPrereqError("Instance's disk layout is not"
2673 " network mirrored, cannot failover.")
2675 secondary_nodes = instance.secondary_nodes
2676 if not secondary_nodes:
2677 raise errors.ProgrammerError("no secondary node but using "
2678 "DT_REMOTE_RAID1 template")
2680 target_node = secondary_nodes[0]
2681 # check memory requirements on the secondary node
2682 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2683 instance.name, instance.memory)
2685 # check bridge existance
2686 brlist = [nic.bridge for nic in instance.nics]
2687 if not rpc.call_bridges_exist(target_node, brlist):
2688 raise errors.OpPrereqError("One or more target bridges %s does not"
2689 " exist on destination node '%s'" %
2690 (brlist, target_node))
2692 self.instance = instance
2694 def Exec(self, feedback_fn):
2695 """Failover an instance.
2697 The failover is done by shutting it down on its present node and
2698 starting it on the secondary.
2701 instance = self.instance
2703 source_node = instance.primary_node
2704 target_node = instance.secondary_nodes[0]
2706 feedback_fn("* checking disk consistency between source and target")
2707 for dev in instance.disks:
2708 # for remote_raid1, these are md over drbd
2709 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2710 if instance.status == "up" and not self.op.ignore_consistency:
2711 raise errors.OpExecError("Disk %s is degraded on target node,"
2712 " aborting failover." % dev.iv_name)
2714 feedback_fn("* shutting down instance on source node")
2715 logger.Info("Shutting down instance %s on node %s" %
2716 (instance.name, source_node))
2718 if not rpc.call_instance_shutdown(source_node, instance):
2719 if self.op.ignore_consistency:
2720 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2721 " anyway. Please make sure node %s is down" %
2722 (instance.name, source_node, source_node))
2724 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2725 (instance.name, source_node))
2727 feedback_fn("* deactivating the instance's disks on source node")
2728 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2729 raise errors.OpExecError("Can't shut down the instance's disks.")
2731 instance.primary_node = target_node
2732 # distribute new instance config to the other nodes
2733 self.cfg.AddInstance(instance)
2735 # Only start the instance if it's marked as up
2736 if instance.status == "up":
2737 feedback_fn("* activating the instance's disks on target node")
2738 logger.Info("Starting instance %s on node %s" %
2739 (instance.name, target_node))
2741 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2742 ignore_secondaries=True)
2744 _ShutdownInstanceDisks(instance, self.cfg)
2745 raise errors.OpExecError("Can't activate the instance's disks")
2747 feedback_fn("* starting the instance on the target node")
2748 if not rpc.call_instance_start(target_node, instance, None):
2749 _ShutdownInstanceDisks(instance, self.cfg)
2750 raise errors.OpExecError("Could not start instance %s on node %s." %
2751 (instance.name, target_node))
2754 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2755 """Create a tree of block devices on the primary node.
2757 This always creates all devices.
2761 for child in device.children:
2762 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2765 cfg.SetDiskID(device, node)
2766 new_id = rpc.call_blockdev_create(node, device, device.size,
2767 instance.name, True, info)
2770 if device.physical_id is None:
2771 device.physical_id = new_id
2775 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2776 """Create a tree of block devices on a secondary node.
2778 If this device type has to be created on secondaries, create it and
2781 If not, just recurse to children keeping the same 'force' value.
2784 if device.CreateOnSecondary():
2787 for child in device.children:
2788 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2789 child, force, info):
2794 cfg.SetDiskID(device, node)
2795 new_id = rpc.call_blockdev_create(node, device, device.size,
2796 instance.name, False, info)
2799 if device.physical_id is None:
2800 device.physical_id = new_id
2804 def _GenerateUniqueNames(cfg, exts):
2805 """Generate a suitable LV name.
2807 This will generate a logical volume name for the given instance.
2812 new_id = cfg.GenerateUniqueID()
2813 results.append("%s%s" % (new_id, val))
2817 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2818 """Generate a drbd device complete with its children.
2821 port = cfg.AllocatePort()
2822 vgname = cfg.GetVGName()
2823 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2824 logical_id=(vgname, names[0]))
2825 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2826 logical_id=(vgname, names[1]))
2827 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2828 logical_id = (primary, secondary, port),
2829 children = [dev_data, dev_meta])
2833 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2834 """Generate a drbd8 device complete with its children.
2837 port = cfg.AllocatePort()
2838 vgname = cfg.GetVGName()
2839 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2840 logical_id=(vgname, names[0]))
2841 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2842 logical_id=(vgname, names[1]))
2843 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2844 logical_id = (primary, secondary, port),
2845 children = [dev_data, dev_meta],
2849 def _GenerateDiskTemplate(cfg, template_name,
2850 instance_name, primary_node,
2851 secondary_nodes, disk_sz, swap_sz):
2852 """Generate the entire disk layout for a given template type.
2855 #TODO: compute space requirements
2857 vgname = cfg.GetVGName()
2858 if template_name == constants.DT_DISKLESS:
2860 elif template_name == constants.DT_PLAIN:
2861 if len(secondary_nodes) != 0:
2862 raise errors.ProgrammerError("Wrong template configuration")
2864 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2865 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2866 logical_id=(vgname, names[0]),
2868 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2869 logical_id=(vgname, names[1]),
2871 disks = [sda_dev, sdb_dev]
2872 elif template_name == constants.DT_LOCAL_RAID1:
2873 if len(secondary_nodes) != 0:
2874 raise errors.ProgrammerError("Wrong template configuration")
2877 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2878 ".sdb_m1", ".sdb_m2"])
2879 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2880 logical_id=(vgname, names[0]))
2881 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2882 logical_id=(vgname, names[1]))
2883 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2885 children = [sda_dev_m1, sda_dev_m2])
2886 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2887 logical_id=(vgname, names[2]))
2888 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2889 logical_id=(vgname, names[3]))
2890 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2892 children = [sdb_dev_m1, sdb_dev_m2])
2893 disks = [md_sda_dev, md_sdb_dev]
2894 elif template_name == constants.DT_REMOTE_RAID1:
2895 if len(secondary_nodes) != 1:
2896 raise errors.ProgrammerError("Wrong template configuration")
2897 remote_node = secondary_nodes[0]
2898 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2899 ".sdb_data", ".sdb_meta"])
2900 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2901 disk_sz, names[0:2])
2902 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2903 children = [drbd_sda_dev], size=disk_sz)
2904 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2905 swap_sz, names[2:4])
2906 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2907 children = [drbd_sdb_dev], size=swap_sz)
2908 disks = [md_sda_dev, md_sdb_dev]
2909 elif template_name == constants.DT_DRBD8:
2910 if len(secondary_nodes) != 1:
2911 raise errors.ProgrammerError("Wrong template configuration")
2912 remote_node = secondary_nodes[0]
2913 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2914 ".sdb_data", ".sdb_meta"])
2915 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2916 disk_sz, names[0:2], "sda")
2917 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2918 swap_sz, names[2:4], "sdb")
2919 disks = [drbd_sda_dev, drbd_sdb_dev]
2921 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2925 def _GetInstanceInfoText(instance):
2926 """Compute that text that should be added to the disk's metadata.
2929 return "originstname+%s" % instance.name
2932 def _CreateDisks(cfg, instance):
2933 """Create all disks for an instance.
2935 This abstracts away some work from AddInstance.
2938 instance: the instance object
2941 True or False showing the success of the creation process
2944 info = _GetInstanceInfoText(instance)
2946 for device in instance.disks:
2947 logger.Info("creating volume %s for instance %s" %
2948 (device.iv_name, instance.name))
2950 for secondary_node in instance.secondary_nodes:
2951 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2952 device, False, info):
2953 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2954 (device.iv_name, device, secondary_node))
2957 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2958 instance, device, info):
2959 logger.Error("failed to create volume %s on primary!" %
2965 def _RemoveDisks(instance, cfg):
2966 """Remove all disks for an instance.
2968 This abstracts away some work from `AddInstance()` and
2969 `RemoveInstance()`. Note that in case some of the devices couldn't
2970 be removed, the removal will continue with the other ones (compare
2971 with `_CreateDisks()`).
2974 instance: the instance object
2977 True or False showing the success of the removal proces
2980 logger.Info("removing block devices for instance %s" % instance.name)
2983 for device in instance.disks:
2984 for node, disk in device.ComputeNodeTree(instance.primary_node):
2985 cfg.SetDiskID(disk, node)
2986 if not rpc.call_blockdev_remove(node, disk):
2987 logger.Error("could not remove block device %s on node %s,"
2988 " continuing anyway" %
2989 (device.iv_name, node))
2994 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2995 """Compute disk size requirements in the volume group
2997 This is currently hard-coded for the two-drive layout.
3000 # Required free disk space as a function of disk and swap space
3002 constants.DT_DISKLESS: None,
3003 constants.DT_PLAIN: disk_size + swap_size,
3004 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3005 # 256 MB are added for drbd metadata, 128MB for each drbd device
3006 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3007 constants.DT_DRBD8: disk_size + swap_size + 256,
3010 if disk_template not in req_size_dict:
3011 raise errors.ProgrammerError("Disk template '%s' size requirement"
3012 " is unknown" % disk_template)
3014 return req_size_dict[disk_template]
3017 class LUCreateInstance(LogicalUnit):
3018 """Create an instance.
3021 HPATH = "instance-add"
3022 HTYPE = constants.HTYPE_INSTANCE
3023 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3024 "disk_template", "swap_size", "mode", "start", "vcpus",
3025 "wait_for_sync", "ip_check", "mac"]
3027 def _RunAllocator(self):
3028 """Run the allocator based on input opcode.
3031 disks = [{"size": self.op.disk_size, "mode": "w"},
3032 {"size": self.op.swap_size, "mode": "w"}]
3033 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3034 "bridge": self.op.bridge}]
3035 ial = IAllocator(self.cfg, self.sstore,
3036 mode=constants.IALLOCATOR_MODE_ALLOC,
3037 name=self.op.instance_name,
3038 disk_template=self.op.disk_template,
3041 vcpus=self.op.vcpus,
3042 mem_size=self.op.mem_size,
3047 ial.Run(self.op.iallocator)
3050 raise errors.OpPrereqError("Can't compute nodes using"
3051 " iallocator '%s': %s" % (self.op.iallocator,
3053 if len(ial.nodes) != ial.required_nodes:
3054 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3055 " of nodes (%s), required %s" %
3056 (len(ial.nodes), ial.required_nodes))
3057 self.op.pnode = ial.nodes[0]
3058 logger.ToStdout("Selected nodes for the instance: %s" %
3059 (", ".join(ial.nodes),))
3060 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3061 (self.op.instance_name, self.op.iallocator, ial.nodes))
3062 if ial.required_nodes == 2:
3063 self.op.snode = ial.nodes[1]
3065 def BuildHooksEnv(self):
3068 This runs on master, primary and secondary nodes of the instance.
3072 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3073 "INSTANCE_DISK_SIZE": self.op.disk_size,
3074 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3075 "INSTANCE_ADD_MODE": self.op.mode,
3077 if self.op.mode == constants.INSTANCE_IMPORT:
3078 env["INSTANCE_SRC_NODE"] = self.op.src_node
3079 env["INSTANCE_SRC_PATH"] = self.op.src_path
3080 env["INSTANCE_SRC_IMAGE"] = self.src_image
3082 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3083 primary_node=self.op.pnode,
3084 secondary_nodes=self.secondaries,
3085 status=self.instance_status,
3086 os_type=self.op.os_type,
3087 memory=self.op.mem_size,
3088 vcpus=self.op.vcpus,
3089 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3092 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3097 def CheckPrereq(self):
3098 """Check prerequisites.
3101 # set optional parameters to none if they don't exist
3102 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3104 if not hasattr(self.op, attr):
3105 setattr(self.op, attr, None)
3107 if self.op.mode not in (constants.INSTANCE_CREATE,
3108 constants.INSTANCE_IMPORT):
3109 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3112 if self.op.mode == constants.INSTANCE_IMPORT:
3113 src_node = getattr(self.op, "src_node", None)
3114 src_path = getattr(self.op, "src_path", None)
3115 if src_node is None or src_path is None:
3116 raise errors.OpPrereqError("Importing an instance requires source"
3117 " node and path options")
3118 src_node_full = self.cfg.ExpandNodeName(src_node)
3119 if src_node_full is None:
3120 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3121 self.op.src_node = src_node = src_node_full
3123 if not os.path.isabs(src_path):
3124 raise errors.OpPrereqError("The source path must be absolute")
3126 export_info = rpc.call_export_info(src_node, src_path)
3129 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3131 if not export_info.has_section(constants.INISECT_EXP):
3132 raise errors.ProgrammerError("Corrupted export config")
3134 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3135 if (int(ei_version) != constants.EXPORT_VERSION):
3136 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3137 (ei_version, constants.EXPORT_VERSION))
3139 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3140 raise errors.OpPrereqError("Can't import instance with more than"
3143 # FIXME: are the old os-es, disk sizes, etc. useful?
3144 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3145 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3147 self.src_image = diskimage
3148 else: # INSTANCE_CREATE
3149 if getattr(self.op, "os_type", None) is None:
3150 raise errors.OpPrereqError("No guest OS specified")
3152 #### instance parameters check
3154 # disk template and mirror node verification
3155 if self.op.disk_template not in constants.DISK_TEMPLATES:
3156 raise errors.OpPrereqError("Invalid disk template name")
3158 # instance name verification
3159 hostname1 = utils.HostInfo(self.op.instance_name)
3161 self.op.instance_name = instance_name = hostname1.name
3162 instance_list = self.cfg.GetInstanceList()
3163 if instance_name in instance_list:
3164 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3167 # ip validity checks
3168 ip = getattr(self.op, "ip", None)
3169 if ip is None or ip.lower() == "none":
3171 elif ip.lower() == "auto":
3172 inst_ip = hostname1.ip
3174 if not utils.IsValidIP(ip):
3175 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3176 " like a valid IP" % ip)
3178 self.inst_ip = self.op.ip = inst_ip
3180 if self.op.start and not self.op.ip_check:
3181 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3182 " adding an instance in start mode")
3184 if self.op.ip_check:
3185 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3186 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3187 (hostname1.ip, instance_name))
3189 # MAC address verification
3190 if self.op.mac != "auto":
3191 if not utils.IsValidMac(self.op.mac.lower()):
3192 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3195 # bridge verification
3196 bridge = getattr(self.op, "bridge", None)
3198 self.op.bridge = self.cfg.GetDefBridge()
3200 self.op.bridge = bridge
3202 # boot order verification
3203 if self.op.hvm_boot_order is not None:
3204 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3205 raise errors.OpPrereqError("invalid boot order specified,"
3206 " must be one or more of [acdn]")
3209 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3210 raise errors.OpPrereqError("One and only one of iallocator and primary"
3211 " node must be given")
3213 if self.op.iallocator is not None:
3214 self._RunAllocator()
3216 #### node related checks
3218 # check primary node
3219 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3221 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3223 self.op.pnode = pnode.name
3225 self.secondaries = []
3227 # mirror node verification
3228 if self.op.disk_template in constants.DTS_NET_MIRROR:
3229 if getattr(self.op, "snode", None) is None:
3230 raise errors.OpPrereqError("The networked disk templates need"
3233 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3234 if snode_name is None:
3235 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3237 elif snode_name == pnode.name:
3238 raise errors.OpPrereqError("The secondary node cannot be"
3239 " the primary node.")
3240 self.secondaries.append(snode_name)
3242 req_size = _ComputeDiskSize(self.op.disk_template,
3243 self.op.disk_size, self.op.swap_size)
3245 # Check lv size requirements
3246 if req_size is not None:
3247 nodenames = [pnode.name] + self.secondaries
3248 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3249 for node in nodenames:
3250 info = nodeinfo.get(node, None)
3252 raise errors.OpPrereqError("Cannot get current information"
3253 " from node '%s'" % nodeinfo)
3254 vg_free = info.get('vg_free', None)
3255 if not isinstance(vg_free, int):
3256 raise errors.OpPrereqError("Can't compute free disk space on"
3258 if req_size > info['vg_free']:
3259 raise errors.OpPrereqError("Not enough disk space on target node %s."
3260 " %d MB available, %d MB required" %
3261 (node, info['vg_free'], req_size))
3264 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3266 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3267 " primary node" % self.op.os_type)
3269 if self.op.kernel_path == constants.VALUE_NONE:
3270 raise errors.OpPrereqError("Can't set instance kernel to none")
3273 # bridge check on primary node
3274 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3275 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3276 " destination node '%s'" %
3277 (self.op.bridge, pnode.name))
3280 self.instance_status = 'up'
3282 self.instance_status = 'down'
3284 def Exec(self, feedback_fn):
3285 """Create and add the instance to the cluster.
3288 instance = self.op.instance_name
3289 pnode_name = self.pnode.name
3291 if self.op.mac == "auto":
3292 mac_address = self.cfg.GenerateMAC()
3294 mac_address = self.op.mac
3296 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3297 if self.inst_ip is not None:
3298 nic.ip = self.inst_ip
3300 ht_kind = self.sstore.GetHypervisorType()
3301 if ht_kind in constants.HTS_REQ_PORT:
3302 network_port = self.cfg.AllocatePort()
3306 disks = _GenerateDiskTemplate(self.cfg,
3307 self.op.disk_template,
3308 instance, pnode_name,
3309 self.secondaries, self.op.disk_size,
3312 iobj = objects.Instance(name=instance, os=self.op.os_type,
3313 primary_node=pnode_name,
3314 memory=self.op.mem_size,
3315 vcpus=self.op.vcpus,
3316 nics=[nic], disks=disks,
3317 disk_template=self.op.disk_template,
3318 status=self.instance_status,
3319 network_port=network_port,
3320 kernel_path=self.op.kernel_path,
3321 initrd_path=self.op.initrd_path,
3322 hvm_boot_order=self.op.hvm_boot_order,
3325 feedback_fn("* creating instance disks...")
3326 if not _CreateDisks(self.cfg, iobj):
3327 _RemoveDisks(iobj, self.cfg)
3328 raise errors.OpExecError("Device creation failed, reverting...")
3330 feedback_fn("adding instance %s to cluster config" % instance)
3332 self.cfg.AddInstance(iobj)
3334 if self.op.wait_for_sync:
3335 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3336 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3337 # make sure the disks are not degraded (still sync-ing is ok)
3339 feedback_fn("* checking mirrors status")
3340 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3345 _RemoveDisks(iobj, self.cfg)
3346 self.cfg.RemoveInstance(iobj.name)
3347 raise errors.OpExecError("There are some degraded disks for"
3350 feedback_fn("creating os for instance %s on node %s" %
3351 (instance, pnode_name))
3353 if iobj.disk_template != constants.DT_DISKLESS:
3354 if self.op.mode == constants.INSTANCE_CREATE:
3355 feedback_fn("* running the instance OS create scripts...")
3356 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3357 raise errors.OpExecError("could not add os for instance %s"
3359 (instance, pnode_name))
3361 elif self.op.mode == constants.INSTANCE_IMPORT:
3362 feedback_fn("* running the instance OS import scripts...")
3363 src_node = self.op.src_node
3364 src_image = self.src_image
3365 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3366 src_node, src_image):
3367 raise errors.OpExecError("Could not import os for instance"
3369 (instance, pnode_name))
3371 # also checked in the prereq part
3372 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3376 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3377 feedback_fn("* starting instance...")
3378 if not rpc.call_instance_start(pnode_name, iobj, None):
3379 raise errors.OpExecError("Could not start instance")
3382 class LUConnectConsole(NoHooksLU):
3383 """Connect to an instance's console.
3385 This is somewhat special in that it returns the command line that
3386 you need to run on the master node in order to connect to the
3390 _OP_REQP = ["instance_name"]
3392 def CheckPrereq(self):
3393 """Check prerequisites.
3395 This checks that the instance is in the cluster.
3398 instance = self.cfg.GetInstanceInfo(
3399 self.cfg.ExpandInstanceName(self.op.instance_name))
3400 if instance is None:
3401 raise errors.OpPrereqError("Instance '%s' not known" %
3402 self.op.instance_name)
3403 self.instance = instance
3405 def Exec(self, feedback_fn):
3406 """Connect to the console of an instance
3409 instance = self.instance
3410 node = instance.primary_node
3412 node_insts = rpc.call_instance_list([node])[node]
3413 if node_insts is False:
3414 raise errors.OpExecError("Can't connect to node %s." % node)
3416 if instance.name not in node_insts:
3417 raise errors.OpExecError("Instance %s is not running." % instance.name)
3419 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3421 hyper = hypervisor.GetHypervisor()
3422 console_cmd = hyper.GetShellCommandForConsole(instance)
3424 argv = ["ssh", "-q", "-t"]
3425 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3426 argv.extend(ssh.BATCH_MODE_OPTS)
3428 argv.append(console_cmd)
3432 class LUAddMDDRBDComponent(LogicalUnit):
3433 """Adda new mirror member to an instance's disk.
3436 HPATH = "mirror-add"
3437 HTYPE = constants.HTYPE_INSTANCE
3438 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3440 def BuildHooksEnv(self):
3443 This runs on the master, the primary and all the secondaries.
3447 "NEW_SECONDARY": self.op.remote_node,
3448 "DISK_NAME": self.op.disk_name,
3450 env.update(_BuildInstanceHookEnvByObject(self.instance))
3451 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3452 self.op.remote_node,] + list(self.instance.secondary_nodes)
3455 def CheckPrereq(self):
3456 """Check prerequisites.
3458 This checks that the instance is in the cluster.
3461 instance = self.cfg.GetInstanceInfo(
3462 self.cfg.ExpandInstanceName(self.op.instance_name))
3463 if instance is None:
3464 raise errors.OpPrereqError("Instance '%s' not known" %
3465 self.op.instance_name)
3466 self.instance = instance
3468 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3469 if remote_node is None:
3470 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3471 self.remote_node = remote_node
3473 if remote_node == instance.primary_node:
3474 raise errors.OpPrereqError("The specified node is the primary node of"
3477 if instance.disk_template != constants.DT_REMOTE_RAID1:
3478 raise errors.OpPrereqError("Instance's disk layout is not"
3480 for disk in instance.disks:
3481 if disk.iv_name == self.op.disk_name:
3484 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3485 " instance." % self.op.disk_name)
3486 if len(disk.children) > 1:
3487 raise errors.OpPrereqError("The device already has two slave devices."
3488 " This would create a 3-disk raid1 which we"
3492 def Exec(self, feedback_fn):
3493 """Add the mirror component
3497 instance = self.instance
3499 remote_node = self.remote_node
3500 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3501 names = _GenerateUniqueNames(self.cfg, lv_names)
3502 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3503 remote_node, disk.size, names)
3505 logger.Info("adding new mirror component on secondary")
3507 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3509 _GetInstanceInfoText(instance)):
3510 raise errors.OpExecError("Failed to create new component on secondary"
3511 " node %s" % remote_node)
3513 logger.Info("adding new mirror component on primary")
3515 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3517 _GetInstanceInfoText(instance)):
3518 # remove secondary dev
3519 self.cfg.SetDiskID(new_drbd, remote_node)
3520 rpc.call_blockdev_remove(remote_node, new_drbd)
3521 raise errors.OpExecError("Failed to create volume on primary")
3523 # the device exists now
3524 # call the primary node to add the mirror to md
3525 logger.Info("adding new mirror component to md")
3526 if not rpc.call_blockdev_addchildren(instance.primary_node,
3528 logger.Error("Can't add mirror compoment to md!")
3529 self.cfg.SetDiskID(new_drbd, remote_node)
3530 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3531 logger.Error("Can't rollback on secondary")
3532 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3533 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3534 logger.Error("Can't rollback on primary")
3535 raise errors.OpExecError("Can't add mirror component to md array")
3537 disk.children.append(new_drbd)
3539 self.cfg.AddInstance(instance)
3541 _WaitForSync(self.cfg, instance, self.proc)
3546 class LURemoveMDDRBDComponent(LogicalUnit):
3547 """Remove a component from a remote_raid1 disk.
3550 HPATH = "mirror-remove"
3551 HTYPE = constants.HTYPE_INSTANCE
3552 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3554 def BuildHooksEnv(self):
3557 This runs on the master, the primary and all the secondaries.
3561 "DISK_NAME": self.op.disk_name,
3562 "DISK_ID": self.op.disk_id,
3563 "OLD_SECONDARY": self.old_secondary,
3565 env.update(_BuildInstanceHookEnvByObject(self.instance))
3566 nl = [self.sstore.GetMasterNode(),
3567 self.instance.primary_node] + list(self.instance.secondary_nodes)
3570 def CheckPrereq(self):
3571 """Check prerequisites.
3573 This checks that the instance is in the cluster.
3576 instance = self.cfg.GetInstanceInfo(
3577 self.cfg.ExpandInstanceName(self.op.instance_name))
3578 if instance is None:
3579 raise errors.OpPrereqError("Instance '%s' not known" %
3580 self.op.instance_name)
3581 self.instance = instance
3583 if instance.disk_template != constants.DT_REMOTE_RAID1:
3584 raise errors.OpPrereqError("Instance's disk layout is not"
3586 for disk in instance.disks:
3587 if disk.iv_name == self.op.disk_name:
3590 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3591 " instance." % self.op.disk_name)
3592 for child in disk.children:
3593 if (child.dev_type == constants.LD_DRBD7 and
3594 child.logical_id[2] == self.op.disk_id):
3597 raise errors.OpPrereqError("Can't find the device with this port.")
3599 if len(disk.children) < 2:
3600 raise errors.OpPrereqError("Cannot remove the last component from"
3604 if self.child.logical_id[0] == instance.primary_node:
3608 self.old_secondary = self.child.logical_id[oid]
3610 def Exec(self, feedback_fn):
3611 """Remove the mirror component
3614 instance = self.instance
3617 logger.Info("remove mirror component")
3618 self.cfg.SetDiskID(disk, instance.primary_node)
3619 if not rpc.call_blockdev_removechildren(instance.primary_node,
3621 raise errors.OpExecError("Can't remove child from mirror.")
3623 for node in child.logical_id[:2]:
3624 self.cfg.SetDiskID(child, node)
3625 if not rpc.call_blockdev_remove(node, child):
3626 logger.Error("Warning: failed to remove device from node %s,"
3627 " continuing operation." % node)
3629 disk.children.remove(child)
3630 self.cfg.AddInstance(instance)
3633 class LUReplaceDisks(LogicalUnit):
3634 """Replace the disks of an instance.
3637 HPATH = "mirrors-replace"
3638 HTYPE = constants.HTYPE_INSTANCE
3639 _OP_REQP = ["instance_name", "mode", "disks"]
3641 def _RunAllocator(self):
3642 """Compute a new secondary node using an IAllocator.
3645 ial = IAllocator(self.cfg, self.sstore,
3646 mode=constants.IALLOCATOR_MODE_RELOC,
3647 name=self.op.instance_name,
3648 relocate_from=[self.sec_node])
3650 ial.Run(self.op.iallocator)
3653 raise errors.OpPrereqError("Can't compute nodes using"
3654 " iallocator '%s': %s" % (self.op.iallocator,
3656 if len(ial.nodes) != ial.required_nodes:
3657 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3658 " of nodes (%s), required %s" %
3659 (len(ial.nodes), ial.required_nodes))
3660 self.op.remote_node = ial.nodes[0]
3661 logger.ToStdout("Selected new secondary for the instance: %s" %
3662 self.op.remote_node)
3664 def BuildHooksEnv(self):
3667 This runs on the master, the primary and all the secondaries.
3671 "MODE": self.op.mode,
3672 "NEW_SECONDARY": self.op.remote_node,
3673 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3675 env.update(_BuildInstanceHookEnvByObject(self.instance))
3677 self.sstore.GetMasterNode(),
3678 self.instance.primary_node,
3680 if self.op.remote_node is not None:
3681 nl.append(self.op.remote_node)
3684 def CheckPrereq(self):
3685 """Check prerequisites.
3687 This checks that the instance is in the cluster.
3690 if not hasattr(self.op, "remote_node"):
3691 self.op.remote_node = None
3693 instance = self.cfg.GetInstanceInfo(
3694 self.cfg.ExpandInstanceName(self.op.instance_name))
3695 if instance is None:
3696 raise errors.OpPrereqError("Instance '%s' not known" %
3697 self.op.instance_name)
3698 self.instance = instance
3699 self.op.instance_name = instance.name
3701 if instance.disk_template not in constants.DTS_NET_MIRROR:
3702 raise errors.OpPrereqError("Instance's disk layout is not"
3703 " network mirrored.")
3705 if len(instance.secondary_nodes) != 1:
3706 raise errors.OpPrereqError("The instance has a strange layout,"
3707 " expected one secondary but found %d" %
3708 len(instance.secondary_nodes))
3710 self.sec_node = instance.secondary_nodes[0]
3712 ia_name = getattr(self.op, "iallocator", None)
3713 if ia_name is not None:
3714 if self.op.remote_node is not None:
3715 raise errors.OpPrereqError("Give either the iallocator or the new"
3716 " secondary, not both")
3717 self.op.remote_node = self._RunAllocator()
3719 remote_node = self.op.remote_node
3720 if remote_node is not None:
3721 remote_node = self.cfg.ExpandNodeName(remote_node)
3722 if remote_node is None:
3723 raise errors.OpPrereqError("Node '%s' not known" %
3724 self.op.remote_node)
3725 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3727 self.remote_node_info = None
3728 if remote_node == instance.primary_node:
3729 raise errors.OpPrereqError("The specified node is the primary node of"
3731 elif remote_node == self.sec_node:
3732 if self.op.mode == constants.REPLACE_DISK_SEC:
3733 # this is for DRBD8, where we can't execute the same mode of
3734 # replacement as for drbd7 (no different port allocated)
3735 raise errors.OpPrereqError("Same secondary given, cannot execute"
3737 # the user gave the current secondary, switch to
3738 # 'no-replace-secondary' mode for drbd7
3740 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3741 self.op.mode != constants.REPLACE_DISK_ALL):
3742 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3743 " disks replacement, not individual ones")
3744 if instance.disk_template == constants.DT_DRBD8:
3745 if (self.op.mode == constants.REPLACE_DISK_ALL and
3746 remote_node is not None):
3747 # switch to replace secondary mode
3748 self.op.mode = constants.REPLACE_DISK_SEC
3750 if self.op.mode == constants.REPLACE_DISK_ALL:
3751 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3752 " secondary disk replacement, not"
3754 elif self.op.mode == constants.REPLACE_DISK_PRI:
3755 if remote_node is not None:
3756 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3757 " the secondary while doing a primary"
3758 " node disk replacement")
3759 self.tgt_node = instance.primary_node
3760 self.oth_node = instance.secondary_nodes[0]
3761 elif self.op.mode == constants.REPLACE_DISK_SEC:
3762 self.new_node = remote_node # this can be None, in which case
3763 # we don't change the secondary
3764 self.tgt_node = instance.secondary_nodes[0]
3765 self.oth_node = instance.primary_node
3767 raise errors.ProgrammerError("Unhandled disk replace mode")
3769 for name in self.op.disks:
3770 if instance.FindDisk(name) is None:
3771 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3772 (name, instance.name))
3773 self.op.remote_node = remote_node
3775 def _ExecRR1(self, feedback_fn):
3776 """Replace the disks of an instance.
3779 instance = self.instance
3782 if self.op.remote_node is None:
3783 remote_node = self.sec_node
3785 remote_node = self.op.remote_node
3787 for dev in instance.disks:
3789 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3790 names = _GenerateUniqueNames(cfg, lv_names)
3791 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3792 remote_node, size, names)
3793 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3794 logger.Info("adding new mirror component on secondary for %s" %
3797 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3799 _GetInstanceInfoText(instance)):
3800 raise errors.OpExecError("Failed to create new component on secondary"
3801 " node %s. Full abort, cleanup manually!" %
3804 logger.Info("adding new mirror component on primary")
3806 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3808 _GetInstanceInfoText(instance)):
3809 # remove secondary dev
3810 cfg.SetDiskID(new_drbd, remote_node)
3811 rpc.call_blockdev_remove(remote_node, new_drbd)
3812 raise errors.OpExecError("Failed to create volume on primary!"
3813 " Full abort, cleanup manually!!")
3815 # the device exists now
3816 # call the primary node to add the mirror to md
3817 logger.Info("adding new mirror component to md")
3818 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3820 logger.Error("Can't add mirror compoment to md!")
3821 cfg.SetDiskID(new_drbd, remote_node)
3822 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3823 logger.Error("Can't rollback on secondary")
3824 cfg.SetDiskID(new_drbd, instance.primary_node)
3825 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3826 logger.Error("Can't rollback on primary")
3827 raise errors.OpExecError("Full abort, cleanup manually!!")
3829 dev.children.append(new_drbd)
3830 cfg.AddInstance(instance)
3832 # this can fail as the old devices are degraded and _WaitForSync
3833 # does a combined result over all disks, so we don't check its
3835 _WaitForSync(cfg, instance, self.proc, unlock=True)
3837 # so check manually all the devices
3838 for name in iv_names:
3839 dev, child, new_drbd = iv_names[name]
3840 cfg.SetDiskID(dev, instance.primary_node)
3841 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3843 raise errors.OpExecError("MD device %s is degraded!" % name)
3844 cfg.SetDiskID(new_drbd, instance.primary_node)
3845 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3847 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3849 for name in iv_names:
3850 dev, child, new_drbd = iv_names[name]
3851 logger.Info("remove mirror %s component" % name)
3852 cfg.SetDiskID(dev, instance.primary_node)
3853 if not rpc.call_blockdev_removechildren(instance.primary_node,
3855 logger.Error("Can't remove child from mirror, aborting"
3856 " *this device cleanup*.\nYou need to cleanup manually!!")
3859 for node in child.logical_id[:2]:
3860 logger.Info("remove child device on %s" % node)
3861 cfg.SetDiskID(child, node)
3862 if not rpc.call_blockdev_remove(node, child):
3863 logger.Error("Warning: failed to remove device from node %s,"
3864 " continuing operation." % node)
3866 dev.children.remove(child)
3868 cfg.AddInstance(instance)
3870 def _ExecD8DiskOnly(self, feedback_fn):
3871 """Replace a disk on the primary or secondary for dbrd8.
3873 The algorithm for replace is quite complicated:
3874 - for each disk to be replaced:
3875 - create new LVs on the target node with unique names
3876 - detach old LVs from the drbd device
3877 - rename old LVs to name_replaced.<time_t>
3878 - rename new LVs to old LVs
3879 - attach the new LVs (with the old names now) to the drbd device
3880 - wait for sync across all devices
3881 - for each modified disk:
3882 - remove old LVs (which have the name name_replaces.<time_t>)
3884 Failures are not very well handled.
3888 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3889 instance = self.instance
3891 vgname = self.cfg.GetVGName()
3894 tgt_node = self.tgt_node
3895 oth_node = self.oth_node
3897 # Step: check device activation
3898 self.proc.LogStep(1, steps_total, "check device existence")
3899 info("checking volume groups")
3900 my_vg = cfg.GetVGName()
3901 results = rpc.call_vg_list([oth_node, tgt_node])
3903 raise errors.OpExecError("Can't list volume groups on the nodes")
3904 for node in oth_node, tgt_node:
3905 res = results.get(node, False)
3906 if not res or my_vg not in res:
3907 raise errors.OpExecError("Volume group '%s' not found on %s" %
3909 for dev in instance.disks:
3910 if not dev.iv_name in self.op.disks:
3912 for node in tgt_node, oth_node:
3913 info("checking %s on %s" % (dev.iv_name, node))
3914 cfg.SetDiskID(dev, node)
3915 if not rpc.call_blockdev_find(node, dev):
3916 raise errors.OpExecError("Can't find device %s on node %s" %
3917 (dev.iv_name, node))
3919 # Step: check other node consistency
3920 self.proc.LogStep(2, steps_total, "check peer consistency")
3921 for dev in instance.disks:
3922 if not dev.iv_name in self.op.disks:
3924 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3925 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3926 oth_node==instance.primary_node):
3927 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3928 " to replace disks on this node (%s)" %
3929 (oth_node, tgt_node))
3931 # Step: create new storage
3932 self.proc.LogStep(3, steps_total, "allocate new storage")
3933 for dev in instance.disks:
3934 if not dev.iv_name in self.op.disks:
3937 cfg.SetDiskID(dev, tgt_node)
3938 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3939 names = _GenerateUniqueNames(cfg, lv_names)
3940 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3941 logical_id=(vgname, names[0]))
3942 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3943 logical_id=(vgname, names[1]))
3944 new_lvs = [lv_data, lv_meta]
3945 old_lvs = dev.children
3946 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3947 info("creating new local storage on %s for %s" %
3948 (tgt_node, dev.iv_name))
3949 # since we *always* want to create this LV, we use the
3950 # _Create...OnPrimary (which forces the creation), even if we
3951 # are talking about the secondary node
3952 for new_lv in new_lvs:
3953 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3954 _GetInstanceInfoText(instance)):
3955 raise errors.OpExecError("Failed to create new LV named '%s' on"
3957 (new_lv.logical_id[1], tgt_node))
3959 # Step: for each lv, detach+rename*2+attach
3960 self.proc.LogStep(4, steps_total, "change drbd configuration")
3961 for dev, old_lvs, new_lvs in iv_names.itervalues():
3962 info("detaching %s drbd from local storage" % dev.iv_name)
3963 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3964 raise errors.OpExecError("Can't detach drbd from local storage on node"
3965 " %s for device %s" % (tgt_node, dev.iv_name))
3967 #cfg.Update(instance)
3969 # ok, we created the new LVs, so now we know we have the needed
3970 # storage; as such, we proceed on the target node to rename
3971 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3972 # using the assumption that logical_id == physical_id (which in
3973 # turn is the unique_id on that node)
3975 # FIXME(iustin): use a better name for the replaced LVs
3976 temp_suffix = int(time.time())
3977 ren_fn = lambda d, suff: (d.physical_id[0],
3978 d.physical_id[1] + "_replaced-%s" % suff)
3979 # build the rename list based on what LVs exist on the node
3981 for to_ren in old_lvs:
3982 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3983 if find_res is not None: # device exists
3984 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3986 info("renaming the old LVs on the target node")
3987 if not rpc.call_blockdev_rename(tgt_node, rlist):
3988 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3989 # now we rename the new LVs to the old LVs
3990 info("renaming the new LVs on the target node")
3991 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3992 if not rpc.call_blockdev_rename(tgt_node, rlist):
3993 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3995 for old, new in zip(old_lvs, new_lvs):
3996 new.logical_id = old.logical_id
3997 cfg.SetDiskID(new, tgt_node)
3999 for disk in old_lvs:
4000 disk.logical_id = ren_fn(disk, temp_suffix)
4001 cfg.SetDiskID(disk, tgt_node)
4003 # now that the new lvs have the old name, we can add them to the device
4004 info("adding new mirror component on %s" % tgt_node)
4005 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4006 for new_lv in new_lvs:
4007 if not rpc.call_blockdev_remove(tgt_node, new_lv):
4008 warning("Can't rollback device %s", hint="manually cleanup unused"
4010 raise errors.OpExecError("Can't add local storage to drbd")
4012 dev.children = new_lvs
4013 cfg.Update(instance)
4015 # Step: wait for sync
4017 # this can fail as the old devices are degraded and _WaitForSync
4018 # does a combined result over all disks, so we don't check its
4020 self.proc.LogStep(5, steps_total, "sync devices")
4021 _WaitForSync(cfg, instance, self.proc, unlock=True)
4023 # so check manually all the devices
4024 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4025 cfg.SetDiskID(dev, instance.primary_node)
4026 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4028 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4030 # Step: remove old storage
4031 self.proc.LogStep(6, steps_total, "removing old storage")
4032 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4033 info("remove logical volumes for %s" % name)
4035 cfg.SetDiskID(lv, tgt_node)
4036 if not rpc.call_blockdev_remove(tgt_node, lv):
4037 warning("Can't remove old LV", hint="manually remove unused LVs")
4040 def _ExecD8Secondary(self, feedback_fn):
4041 """Replace the secondary node for drbd8.
4043 The algorithm for replace is quite complicated:
4044 - for all disks of the instance:
4045 - create new LVs on the new node with same names
4046 - shutdown the drbd device on the old secondary
4047 - disconnect the drbd network on the primary
4048 - create the drbd device on the new secondary
4049 - network attach the drbd on the primary, using an artifice:
4050 the drbd code for Attach() will connect to the network if it
4051 finds a device which is connected to the good local disks but
4053 - wait for sync across all devices
4054 - remove all disks from the old secondary
4056 Failures are not very well handled.
4060 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4061 instance = self.instance
4063 vgname = self.cfg.GetVGName()
4066 old_node = self.tgt_node
4067 new_node = self.new_node
4068 pri_node = instance.primary_node
4070 # Step: check device activation
4071 self.proc.LogStep(1, steps_total, "check device existence")
4072 info("checking volume groups")
4073 my_vg = cfg.GetVGName()
4074 results = rpc.call_vg_list([pri_node, new_node])
4076 raise errors.OpExecError("Can't list volume groups on the nodes")
4077 for node in pri_node, new_node:
4078 res = results.get(node, False)
4079 if not res or my_vg not in res:
4080 raise errors.OpExecError("Volume group '%s' not found on %s" %
4082 for dev in instance.disks:
4083 if not dev.iv_name in self.op.disks:
4085 info("checking %s on %s" % (dev.iv_name, pri_node))
4086 cfg.SetDiskID(dev, pri_node)
4087 if not rpc.call_blockdev_find(pri_node, dev):
4088 raise errors.OpExecError("Can't find device %s on node %s" %
4089 (dev.iv_name, pri_node))
4091 # Step: check other node consistency
4092 self.proc.LogStep(2, steps_total, "check peer consistency")
4093 for dev in instance.disks:
4094 if not dev.iv_name in self.op.disks:
4096 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4097 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4098 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4099 " unsafe to replace the secondary" %
4102 # Step: create new storage
4103 self.proc.LogStep(3, steps_total, "allocate new storage")
4104 for dev in instance.disks:
4106 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4107 # since we *always* want to create this LV, we use the
4108 # _Create...OnPrimary (which forces the creation), even if we
4109 # are talking about the secondary node
4110 for new_lv in dev.children:
4111 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4112 _GetInstanceInfoText(instance)):
4113 raise errors.OpExecError("Failed to create new LV named '%s' on"
4115 (new_lv.logical_id[1], new_node))
4117 iv_names[dev.iv_name] = (dev, dev.children)
4119 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4120 for dev in instance.disks:
4122 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4123 # create new devices on new_node
4124 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4125 logical_id=(pri_node, new_node,
4127 children=dev.children)
4128 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4130 _GetInstanceInfoText(instance)):
4131 raise errors.OpExecError("Failed to create new DRBD on"
4132 " node '%s'" % new_node)
4134 for dev in instance.disks:
4135 # we have new devices, shutdown the drbd on the old secondary
4136 info("shutting down drbd for %s on old node" % dev.iv_name)
4137 cfg.SetDiskID(dev, old_node)
4138 if not rpc.call_blockdev_shutdown(old_node, dev):
4139 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4140 hint="Please cleanup this device manually as soon as possible")
4142 info("detaching primary drbds from the network (=> standalone)")
4144 for dev in instance.disks:
4145 cfg.SetDiskID(dev, pri_node)
4146 # set the physical (unique in bdev terms) id to None, meaning
4147 # detach from network
4148 dev.physical_id = (None,) * len(dev.physical_id)
4149 # and 'find' the device, which will 'fix' it to match the
4151 if rpc.call_blockdev_find(pri_node, dev):
4154 warning("Failed to detach drbd %s from network, unusual case" %
4158 # no detaches succeeded (very unlikely)
4159 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4161 # if we managed to detach at least one, we update all the disks of
4162 # the instance to point to the new secondary
4163 info("updating instance configuration")
4164 for dev in instance.disks:
4165 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4166 cfg.SetDiskID(dev, pri_node)
4167 cfg.Update(instance)
4169 # and now perform the drbd attach
4170 info("attaching primary drbds to new secondary (standalone => connected)")
4172 for dev in instance.disks:
4173 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4174 # since the attach is smart, it's enough to 'find' the device,
4175 # it will automatically activate the network, if the physical_id
4177 cfg.SetDiskID(dev, pri_node)
4178 if not rpc.call_blockdev_find(pri_node, dev):
4179 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4180 "please do a gnt-instance info to see the status of disks")
4182 # this can fail as the old devices are degraded and _WaitForSync
4183 # does a combined result over all disks, so we don't check its
4185 self.proc.LogStep(5, steps_total, "sync devices")
4186 _WaitForSync(cfg, instance, self.proc, unlock=True)
4188 # so check manually all the devices
4189 for name, (dev, old_lvs) in iv_names.iteritems():
4190 cfg.SetDiskID(dev, pri_node)
4191 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4193 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4195 self.proc.LogStep(6, steps_total, "removing old storage")
4196 for name, (dev, old_lvs) in iv_names.iteritems():
4197 info("remove logical volumes for %s" % name)
4199 cfg.SetDiskID(lv, old_node)
4200 if not rpc.call_blockdev_remove(old_node, lv):
4201 warning("Can't remove LV on old secondary",
4202 hint="Cleanup stale volumes by hand")
4204 def Exec(self, feedback_fn):
4205 """Execute disk replacement.
4207 This dispatches the disk replacement to the appropriate handler.
4210 instance = self.instance
4211 if instance.disk_template == constants.DT_REMOTE_RAID1:
4213 elif instance.disk_template == constants.DT_DRBD8:
4214 if self.op.remote_node is None:
4215 fn = self._ExecD8DiskOnly
4217 fn = self._ExecD8Secondary
4219 raise errors.ProgrammerError("Unhandled disk replacement case")
4220 return fn(feedback_fn)
4223 class LUQueryInstanceData(NoHooksLU):
4224 """Query runtime instance data.
4227 _OP_REQP = ["instances"]
4229 def CheckPrereq(self):
4230 """Check prerequisites.
4232 This only checks the optional instance list against the existing names.
4235 if not isinstance(self.op.instances, list):
4236 raise errors.OpPrereqError("Invalid argument type 'instances'")
4237 if self.op.instances:
4238 self.wanted_instances = []
4239 names = self.op.instances
4241 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4242 if instance is None:
4243 raise errors.OpPrereqError("No such instance name '%s'" % name)
4244 self.wanted_instances.append(instance)
4246 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4247 in self.cfg.GetInstanceList()]
4251 def _ComputeDiskStatus(self, instance, snode, dev):
4252 """Compute block device status.
4255 self.cfg.SetDiskID(dev, instance.primary_node)
4256 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4257 if dev.dev_type in constants.LDS_DRBD:
4258 # we change the snode then (otherwise we use the one passed in)
4259 if dev.logical_id[0] == instance.primary_node:
4260 snode = dev.logical_id[1]
4262 snode = dev.logical_id[0]
4265 self.cfg.SetDiskID(dev, snode)
4266 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4271 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4272 for child in dev.children]
4277 "iv_name": dev.iv_name,
4278 "dev_type": dev.dev_type,
4279 "logical_id": dev.logical_id,
4280 "physical_id": dev.physical_id,
4281 "pstatus": dev_pstatus,
4282 "sstatus": dev_sstatus,
4283 "children": dev_children,
4288 def Exec(self, feedback_fn):
4289 """Gather and return data"""
4291 for instance in self.wanted_instances:
4292 remote_info = rpc.call_instance_info(instance.primary_node,
4294 if remote_info and "state" in remote_info:
4297 remote_state = "down"
4298 if instance.status == "down":
4299 config_state = "down"
4303 disks = [self._ComputeDiskStatus(instance, None, device)
4304 for device in instance.disks]
4307 "name": instance.name,
4308 "config_state": config_state,
4309 "run_state": remote_state,
4310 "pnode": instance.primary_node,
4311 "snodes": instance.secondary_nodes,
4313 "memory": instance.memory,
4314 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4316 "network_port": instance.network_port,
4317 "vcpus": instance.vcpus,
4318 "kernel_path": instance.kernel_path,
4319 "initrd_path": instance.initrd_path,
4320 "hvm_boot_order": instance.hvm_boot_order,
4323 result[instance.name] = idict
4328 class LUSetInstanceParms(LogicalUnit):
4329 """Modifies an instances's parameters.
4332 HPATH = "instance-modify"
4333 HTYPE = constants.HTYPE_INSTANCE
4334 _OP_REQP = ["instance_name"]
4336 def BuildHooksEnv(self):
4339 This runs on the master, primary and secondaries.
4344 args['memory'] = self.mem
4346 args['vcpus'] = self.vcpus
4347 if self.do_ip or self.do_bridge or self.mac:
4351 ip = self.instance.nics[0].ip
4353 bridge = self.bridge
4355 bridge = self.instance.nics[0].bridge
4359 mac = self.instance.nics[0].mac
4360 args['nics'] = [(ip, bridge, mac)]
4361 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4362 nl = [self.sstore.GetMasterNode(),
4363 self.instance.primary_node] + list(self.instance.secondary_nodes)
4366 def CheckPrereq(self):
4367 """Check prerequisites.
4369 This only checks the instance list against the existing names.
4372 self.mem = getattr(self.op, "mem", None)
4373 self.vcpus = getattr(self.op, "vcpus", None)
4374 self.ip = getattr(self.op, "ip", None)
4375 self.mac = getattr(self.op, "mac", None)
4376 self.bridge = getattr(self.op, "bridge", None)
4377 self.kernel_path = getattr(self.op, "kernel_path", None)
4378 self.initrd_path = getattr(self.op, "initrd_path", None)
4379 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4380 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4381 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4382 if all_parms.count(None) == len(all_parms):
4383 raise errors.OpPrereqError("No changes submitted")
4384 if self.mem is not None:
4386 self.mem = int(self.mem)
4387 except ValueError, err:
4388 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4389 if self.vcpus is not None:
4391 self.vcpus = int(self.vcpus)
4392 except ValueError, err:
4393 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4394 if self.ip is not None:
4396 if self.ip.lower() == "none":
4399 if not utils.IsValidIP(self.ip):
4400 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4403 self.do_bridge = (self.bridge is not None)
4404 if self.mac is not None:
4405 if self.cfg.IsMacInUse(self.mac):
4406 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4408 if not utils.IsValidMac(self.mac):
4409 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4411 if self.kernel_path is not None:
4412 self.do_kernel_path = True
4413 if self.kernel_path == constants.VALUE_NONE:
4414 raise errors.OpPrereqError("Can't set instance to no kernel")
4416 if self.kernel_path != constants.VALUE_DEFAULT:
4417 if not os.path.isabs(self.kernel_path):
4418 raise errors.OpPrereqError("The kernel path must be an absolute"
4421 self.do_kernel_path = False
4423 if self.initrd_path is not None:
4424 self.do_initrd_path = True
4425 if self.initrd_path not in (constants.VALUE_NONE,
4426 constants.VALUE_DEFAULT):
4427 if not os.path.isabs(self.initrd_path):
4428 raise errors.OpPrereqError("The initrd path must be an absolute"
4431 self.do_initrd_path = False
4433 # boot order verification
4434 if self.hvm_boot_order is not None:
4435 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4436 if len(self.hvm_boot_order.strip("acdn")) != 0:
4437 raise errors.OpPrereqError("invalid boot order specified,"
4438 " must be one or more of [acdn]"
4441 instance = self.cfg.GetInstanceInfo(
4442 self.cfg.ExpandInstanceName(self.op.instance_name))
4443 if instance is None:
4444 raise errors.OpPrereqError("No such instance name '%s'" %
4445 self.op.instance_name)
4446 self.op.instance_name = instance.name
4447 self.instance = instance
4450 def Exec(self, feedback_fn):
4451 """Modifies an instance.
4453 All parameters take effect only at the next restart of the instance.
4456 instance = self.instance
4458 instance.memory = self.mem
4459 result.append(("mem", self.mem))
4461 instance.vcpus = self.vcpus
4462 result.append(("vcpus", self.vcpus))
4464 instance.nics[0].ip = self.ip
4465 result.append(("ip", self.ip))
4467 instance.nics[0].bridge = self.bridge
4468 result.append(("bridge", self.bridge))
4470 instance.nics[0].mac = self.mac
4471 result.append(("mac", self.mac))
4472 if self.do_kernel_path:
4473 instance.kernel_path = self.kernel_path
4474 result.append(("kernel_path", self.kernel_path))
4475 if self.do_initrd_path:
4476 instance.initrd_path = self.initrd_path
4477 result.append(("initrd_path", self.initrd_path))
4478 if self.hvm_boot_order:
4479 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4480 instance.hvm_boot_order = None
4482 instance.hvm_boot_order = self.hvm_boot_order
4483 result.append(("hvm_boot_order", self.hvm_boot_order))
4485 self.cfg.AddInstance(instance)
4490 class LUQueryExports(NoHooksLU):
4491 """Query the exports list
4496 def CheckPrereq(self):
4497 """Check that the nodelist contains only existing nodes.
4500 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4502 def Exec(self, feedback_fn):
4503 """Compute the list of all the exported system images.
4506 a dictionary with the structure node->(export-list)
4507 where export-list is a list of the instances exported on
4511 return rpc.call_export_list(self.nodes)
4514 class LUExportInstance(LogicalUnit):
4515 """Export an instance to an image in the cluster.
4518 HPATH = "instance-export"
4519 HTYPE = constants.HTYPE_INSTANCE
4520 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4522 def BuildHooksEnv(self):
4525 This will run on the master, primary node and target node.
4529 "EXPORT_NODE": self.op.target_node,
4530 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4532 env.update(_BuildInstanceHookEnvByObject(self.instance))
4533 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4534 self.op.target_node]
4537 def CheckPrereq(self):
4538 """Check prerequisites.
4540 This checks that the instance and node names are valid.
4543 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4544 self.instance = self.cfg.GetInstanceInfo(instance_name)
4545 if self.instance is None:
4546 raise errors.OpPrereqError("Instance '%s' not found" %
4547 self.op.instance_name)
4550 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4551 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4553 if self.dst_node is None:
4554 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4555 self.op.target_node)
4556 self.op.target_node = self.dst_node.name
4558 def Exec(self, feedback_fn):
4559 """Export an instance to an image in the cluster.
4562 instance = self.instance
4563 dst_node = self.dst_node
4564 src_node = instance.primary_node
4565 if self.op.shutdown:
4566 # shutdown the instance, but not the disks
4567 if not rpc.call_instance_shutdown(src_node, instance):
4568 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4569 (instance.name, src_node))
4571 vgname = self.cfg.GetVGName()
4576 for disk in instance.disks:
4577 if disk.iv_name == "sda":
4578 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4579 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4581 if not new_dev_name:
4582 logger.Error("could not snapshot block device %s on node %s" %
4583 (disk.logical_id[1], src_node))
4585 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4586 logical_id=(vgname, new_dev_name),
4587 physical_id=(vgname, new_dev_name),
4588 iv_name=disk.iv_name)
4589 snap_disks.append(new_dev)
4592 if self.op.shutdown and instance.status == "up":
4593 if not rpc.call_instance_start(src_node, instance, None):
4594 _ShutdownInstanceDisks(instance, self.cfg)
4595 raise errors.OpExecError("Could not start instance")
4597 # TODO: check for size
4599 for dev in snap_disks:
4600 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4602 logger.Error("could not export block device %s from node"
4604 (dev.logical_id[1], src_node, dst_node.name))
4605 if not rpc.call_blockdev_remove(src_node, dev):
4606 logger.Error("could not remove snapshot block device %s from"
4607 " node %s" % (dev.logical_id[1], src_node))
4609 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4610 logger.Error("could not finalize export for instance %s on node %s" %
4611 (instance.name, dst_node.name))
4613 nodelist = self.cfg.GetNodeList()
4614 nodelist.remove(dst_node.name)
4616 # on one-node clusters nodelist will be empty after the removal
4617 # if we proceed the backup would be removed because OpQueryExports
4618 # substitutes an empty list with the full cluster node list.
4620 op = opcodes.OpQueryExports(nodes=nodelist)
4621 exportlist = self.proc.ChainOpCode(op)
4622 for node in exportlist:
4623 if instance.name in exportlist[node]:
4624 if not rpc.call_export_remove(node, instance.name):
4625 logger.Error("could not remove older export for instance %s"
4626 " on node %s" % (instance.name, node))
4629 class LURemoveExport(NoHooksLU):
4630 """Remove exports related to the named instance.
4633 _OP_REQP = ["instance_name"]
4635 def CheckPrereq(self):
4636 """Check prerequisites.
4640 def Exec(self, feedback_fn):
4641 """Remove any export.
4644 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4645 # If the instance was not found we'll try with the name that was passed in.
4646 # This will only work if it was an FQDN, though.
4648 if not instance_name:
4650 instance_name = self.op.instance_name
4652 op = opcodes.OpQueryExports(nodes=[])
4653 exportlist = self.proc.ChainOpCode(op)
4655 for node in exportlist:
4656 if instance_name in exportlist[node]:
4658 if not rpc.call_export_remove(node, instance_name):
4659 logger.Error("could not remove export for instance %s"
4660 " on node %s" % (instance_name, node))
4662 if fqdn_warn and not found:
4663 feedback_fn("Export not found. If trying to remove an export belonging"
4664 " to a deleted instance please use its Fully Qualified"
4668 class TagsLU(NoHooksLU):
4671 This is an abstract class which is the parent of all the other tags LUs.
4674 def CheckPrereq(self):
4675 """Check prerequisites.
4678 if self.op.kind == constants.TAG_CLUSTER:
4679 self.target = self.cfg.GetClusterInfo()
4680 elif self.op.kind == constants.TAG_NODE:
4681 name = self.cfg.ExpandNodeName(self.op.name)
4683 raise errors.OpPrereqError("Invalid node name (%s)" %
4686 self.target = self.cfg.GetNodeInfo(name)
4687 elif self.op.kind == constants.TAG_INSTANCE:
4688 name = self.cfg.ExpandInstanceName(self.op.name)
4690 raise errors.OpPrereqError("Invalid instance name (%s)" %
4693 self.target = self.cfg.GetInstanceInfo(name)
4695 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4699 class LUGetTags(TagsLU):
4700 """Returns the tags of a given object.
4703 _OP_REQP = ["kind", "name"]
4705 def Exec(self, feedback_fn):
4706 """Returns the tag list.
4709 return self.target.GetTags()
4712 class LUSearchTags(NoHooksLU):
4713 """Searches the tags for a given pattern.
4716 _OP_REQP = ["pattern"]
4718 def CheckPrereq(self):
4719 """Check prerequisites.
4721 This checks the pattern passed for validity by compiling it.
4725 self.re = re.compile(self.op.pattern)
4726 except re.error, err:
4727 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4728 (self.op.pattern, err))
4730 def Exec(self, feedback_fn):
4731 """Returns the tag list.
4735 tgts = [("/cluster", cfg.GetClusterInfo())]
4736 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4737 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4738 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4739 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4741 for path, target in tgts:
4742 for tag in target.GetTags():
4743 if self.re.search(tag):
4744 results.append((path, tag))
4748 class LUAddTags(TagsLU):
4749 """Sets a tag on a given object.
4752 _OP_REQP = ["kind", "name", "tags"]
4754 def CheckPrereq(self):
4755 """Check prerequisites.
4757 This checks the type and length of the tag name and value.
4760 TagsLU.CheckPrereq(self)
4761 for tag in self.op.tags:
4762 objects.TaggableObject.ValidateTag(tag)
4764 def Exec(self, feedback_fn):
4769 for tag in self.op.tags:
4770 self.target.AddTag(tag)
4771 except errors.TagError, err:
4772 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4774 self.cfg.Update(self.target)
4775 except errors.ConfigurationError:
4776 raise errors.OpRetryError("There has been a modification to the"
4777 " config file and the operation has been"
4778 " aborted. Please retry.")
4781 class LUDelTags(TagsLU):
4782 """Delete a list of tags from a given object.
4785 _OP_REQP = ["kind", "name", "tags"]
4787 def CheckPrereq(self):
4788 """Check prerequisites.
4790 This checks that we have the given tag.
4793 TagsLU.CheckPrereq(self)
4794 for tag in self.op.tags:
4795 objects.TaggableObject.ValidateTag(tag)
4796 del_tags = frozenset(self.op.tags)
4797 cur_tags = self.target.GetTags()
4798 if not del_tags <= cur_tags:
4799 diff_tags = del_tags - cur_tags
4800 diff_names = ["'%s'" % tag for tag in diff_tags]
4802 raise errors.OpPrereqError("Tag(s) %s not found" %
4803 (",".join(diff_names)))
4805 def Exec(self, feedback_fn):
4806 """Remove the tag from the object.
4809 for tag in self.op.tags:
4810 self.target.RemoveTag(tag)
4812 self.cfg.Update(self.target)
4813 except errors.ConfigurationError:
4814 raise errors.OpRetryError("There has been a modification to the"
4815 " config file and the operation has been"
4816 " aborted. Please retry.")
4818 class LUTestDelay(NoHooksLU):
4819 """Sleep for a specified amount of time.
4821 This LU sleeps on the master and/or nodes for a specified amoutn of
4825 _OP_REQP = ["duration", "on_master", "on_nodes"]
4827 def CheckPrereq(self):
4828 """Check prerequisites.
4830 This checks that we have a good list of nodes and/or the duration
4835 if self.op.on_nodes:
4836 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4838 def Exec(self, feedback_fn):
4839 """Do the actual sleep.
4842 if self.op.on_master:
4843 if not utils.TestDelay(self.op.duration):
4844 raise errors.OpExecError("Error during master delay test")
4845 if self.op.on_nodes:
4846 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4848 raise errors.OpExecError("Complete failure from rpc call")
4849 for node, node_result in result.items():
4851 raise errors.OpExecError("Failure during rpc call to node %s,"
4852 " result: %s" % (node, node_result))
4855 class IAllocator(object):
4856 """IAllocator framework.
4858 An IAllocator instance has three sets of attributes:
4859 - cfg/sstore that are needed to query the cluster
4860 - input data (all members of the _KEYS class attribute are required)
4861 - four buffer attributes (in|out_data|text), that represent the
4862 input (to the external script) in text and data structure format,
4863 and the output from it, again in two formats
4864 - the result variables from the script (success, info, nodes) for
4869 "mem_size", "disks", "disk_template",
4870 "os", "tags", "nics", "vcpus",
4876 def __init__(self, cfg, sstore, mode, name, **kwargs):
4878 self.sstore = sstore
4879 # init buffer variables
4880 self.in_text = self.out_text = self.in_data = self.out_data = None
4881 # init all input fields so that pylint is happy
4884 self.mem_size = self.disks = self.disk_template = None
4885 self.os = self.tags = self.nics = self.vcpus = None
4886 self.relocate_from = None
4888 self.required_nodes = None
4889 # init result fields
4890 self.success = self.info = self.nodes = None
4891 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4892 keyset = self._ALLO_KEYS
4893 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4894 keyset = self._RELO_KEYS
4896 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4897 " IAllocator" % self.mode)
4899 if key not in keyset:
4900 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4901 " IAllocator" % key)
4902 setattr(self, key, kwargs[key])
4904 if key not in kwargs:
4905 raise errors.ProgrammerError("Missing input parameter '%s' to"
4906 " IAllocator" % key)
4907 self._BuildInputData()
4909 def _ComputeClusterData(self):
4910 """Compute the generic allocator input data.
4912 This is the data that is independent of the actual operation.
4919 "cluster_name": self.sstore.GetClusterName(),
4920 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4921 # we don't have job IDs
4926 node_list = cfg.GetNodeList()
4927 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4928 for nname in node_list:
4929 ninfo = cfg.GetNodeInfo(nname)
4930 if nname not in node_data or not isinstance(node_data[nname], dict):
4931 raise errors.OpExecError("Can't get data for node %s" % nname)
4932 remote_info = node_data[nname]
4933 for attr in ['memory_total', 'memory_free',
4934 'vg_size', 'vg_free']:
4935 if attr not in remote_info:
4936 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4939 int(remote_info[attr])
4940 except ValueError, err:
4941 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4942 " %s" % (nname, attr, str(err)))
4944 "tags": list(ninfo.GetTags()),
4945 "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4946 "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4947 "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4948 "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4949 "primary_ip": ninfo.primary_ip,
4950 "secondary_ip": ninfo.secondary_ip,
4952 node_results[nname] = pnr
4953 data["nodes"] = node_results
4957 i_list = cfg.GetInstanceList()
4958 for iname in i_list:
4959 iinfo = cfg.GetInstanceInfo(iname)
4960 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4961 for n in iinfo.nics]
4963 "tags": list(iinfo.GetTags()),
4964 "should_run": iinfo.status == "up",
4965 "vcpus": iinfo.vcpus,
4966 "memory": iinfo.memory,
4968 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4970 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4971 "disk_template": iinfo.disk_template,
4973 instance_data[iname] = pir
4975 data["instances"] = instance_data
4979 def _AddNewInstance(self):
4980 """Add new instance data to allocator structure.
4982 This in combination with _AllocatorGetClusterData will create the
4983 correct structure needed as input for the allocator.
4985 The checks for the completeness of the opcode must have already been
4990 if len(self.disks) != 2:
4991 raise errors.OpExecError("Only two-disk configurations supported")
4993 disk_space = _ComputeDiskSize(self.disk_template,
4994 self.disks[0]["size"], self.disks[1]["size"])
4996 if self.disk_template in constants.DTS_NET_MIRROR:
4997 self.required_nodes = 2
4999 self.required_nodes = 1
5003 "disk_template": self.disk_template,
5006 "vcpus": self.vcpus,
5007 "memory": self.mem_size,
5008 "disks": self.disks,
5009 "disk_space_total": disk_space,
5011 "required_nodes": self.required_nodes,
5013 data["request"] = request
5015 def _AddRelocateInstance(self):
5016 """Add relocate instance data to allocator structure.
5018 This in combination with _IAllocatorGetClusterData will create the
5019 correct structure needed as input for the allocator.
5021 The checks for the completeness of the opcode must have already been
5025 instance = self.cfg.GetInstanceInfo(self.name)
5026 if instance is None:
5027 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5028 " IAllocator" % self.name)
5030 if instance.disk_template not in constants.DTS_NET_MIRROR:
5031 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5033 if len(instance.secondary_nodes) != 1:
5034 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5036 self.required_nodes = 1
5038 disk_space = _ComputeDiskSize(instance.disk_template,
5039 instance.disks[0].size,
5040 instance.disks[1].size)
5045 "disk_space_total": disk_space,
5046 "required_nodes": self.required_nodes,
5047 "relocate_from": self.relocate_from,
5049 self.in_data["request"] = request
5051 def _BuildInputData(self):
5052 """Build input data structures.
5055 self._ComputeClusterData()
5057 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5058 self._AddNewInstance()
5060 self._AddRelocateInstance()
5062 self.in_text = serializer.Dump(self.in_data)
5064 def Run(self, name, validate=True):
5065 """Run an instance allocator and return the results.
5070 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
5072 if alloc_script is None:
5073 raise errors.OpExecError("Can't find allocator '%s'" % name)
5075 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
5079 result = utils.RunCmd([alloc_script, fin_name])
5081 raise errors.OpExecError("Instance allocator call failed: %s,"
5083 (result.fail_reason, result.output))
5086 self.out_text = result.stdout
5088 self._ValidateResult()
5090 def _ValidateResult(self):
5091 """Process the allocator results.
5093 This will process and if successful save the result in
5094 self.out_data and the other parameters.
5098 rdict = serializer.Load(self.out_text)
5099 except Exception, err:
5100 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5102 if not isinstance(rdict, dict):
5103 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5105 for key in "success", "info", "nodes":
5106 if key not in rdict:
5107 raise errors.OpExecError("Can't parse iallocator results:"
5108 " missing key '%s'" % key)
5109 setattr(self, key, rdict[key])
5111 if not isinstance(rdict["nodes"], list):
5112 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5114 self.out_data = rdict
5117 class LUTestAllocator(NoHooksLU):
5118 """Run allocator tests.
5120 This LU runs the allocator tests
5123 _OP_REQP = ["direction", "mode", "name"]
5125 def CheckPrereq(self):
5126 """Check prerequisites.
5128 This checks the opcode parameters depending on the director and mode test.
5131 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5132 for attr in ["name", "mem_size", "disks", "disk_template",
5133 "os", "tags", "nics", "vcpus"]:
5134 if not hasattr(self.op, attr):
5135 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5137 iname = self.cfg.ExpandInstanceName(self.op.name)
5138 if iname is not None:
5139 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5141 if not isinstance(self.op.nics, list):
5142 raise errors.OpPrereqError("Invalid parameter 'nics'")
5143 for row in self.op.nics:
5144 if (not isinstance(row, dict) or
5147 "bridge" not in row):
5148 raise errors.OpPrereqError("Invalid contents of the"
5149 " 'nics' parameter")
5150 if not isinstance(self.op.disks, list):
5151 raise errors.OpPrereqError("Invalid parameter 'disks'")
5152 if len(self.op.disks) != 2:
5153 raise errors.OpPrereqError("Only two-disk configurations supported")
5154 for row in self.op.disks:
5155 if (not isinstance(row, dict) or
5156 "size" not in row or
5157 not isinstance(row["size"], int) or
5158 "mode" not in row or
5159 row["mode"] not in ['r', 'w']):
5160 raise errors.OpPrereqError("Invalid contents of the"
5161 " 'disks' parameter")
5162 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5163 if not hasattr(self.op, "name"):
5164 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5165 fname = self.cfg.ExpandInstanceName(self.op.name)
5167 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5169 self.op.name = fname
5170 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5172 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5175 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5176 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5177 raise errors.OpPrereqError("Missing allocator name")
5178 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5179 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5182 def Exec(self, feedback_fn):
5183 """Run the allocator test.
5186 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5187 ial = IAllocator(self.cfg, self.sstore,
5190 mem_size=self.op.mem_size,
5191 disks=self.op.disks,
5192 disk_template=self.op.disk_template,
5196 vcpus=self.op.vcpus,
5199 ial = IAllocator(self.cfg, self.sstore,
5202 relocate_from=list(self.relocate_from),
5205 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5206 result = ial.in_text
5208 ial.Run(self.op.allocator, validate=False)
5209 result = ial.out_text