4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 As for the node lists, the master should not be included in the
137 them, as it will be added by the hooks runner in case this LU
138 requires a cluster to run on (otherwise we don't have a node
139 list). No nodes should be returned as an empty list (and not
142 Note that if the HPATH for a LU class is None, this function will
146 raise NotImplementedError
149 class NoHooksLU(LogicalUnit):
150 """Simple LU which runs no hooks.
152 This LU is intended as a parent for other LogicalUnits which will
153 run no hooks, in order to reduce duplicate code.
159 def BuildHooksEnv(self):
162 This is a no-op, since we don't run hooks.
168 def _AddHostToEtcHosts(hostname):
169 """Wrapper around utils.SetEtcHostsEntry.
172 hi = utils.HostInfo(name=hostname)
173 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
176 def _RemoveHostFromEtcHosts(hostname):
177 """Wrapper around utils.RemoveEtcHostsEntry.
180 hi = utils.HostInfo(name=hostname)
181 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
182 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
185 def _GetWantedNodes(lu, nodes):
186 """Returns list of checked and expanded node names.
189 nodes: List of nodes (strings) or None for all
192 if not isinstance(nodes, list):
193 raise errors.OpPrereqError("Invalid argument type 'nodes'")
199 node = lu.cfg.ExpandNodeName(name)
201 raise errors.OpPrereqError("No such node name '%s'" % name)
205 wanted = lu.cfg.GetNodeList()
206 return utils.NiceSort(wanted)
209 def _GetWantedInstances(lu, instances):
210 """Returns list of checked and expanded instance names.
213 instances: List of instances (strings) or None for all
216 if not isinstance(instances, list):
217 raise errors.OpPrereqError("Invalid argument type 'instances'")
222 for name in instances:
223 instance = lu.cfg.ExpandInstanceName(name)
225 raise errors.OpPrereqError("No such instance name '%s'" % name)
226 wanted.append(instance)
229 wanted = lu.cfg.GetInstanceList()
230 return utils.NiceSort(wanted)
233 def _CheckOutputFields(static, dynamic, selected):
234 """Checks whether all selected fields are valid.
237 static: Static fields
238 dynamic: Dynamic fields
241 static_fields = frozenset(static)
242 dynamic_fields = frozenset(dynamic)
244 all_fields = static_fields | dynamic_fields
246 if not all_fields.issuperset(selected):
247 raise errors.OpPrereqError("Unknown output fields selected: %s"
248 % ",".join(frozenset(selected).
249 difference(all_fields)))
252 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253 memory, vcpus, nics):
254 """Builds instance related env variables for hooks from single variables.
257 secondary_nodes: List of secondary nodes as strings
261 "INSTANCE_NAME": name,
262 "INSTANCE_PRIMARY": primary_node,
263 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264 "INSTANCE_OS_TYPE": os_type,
265 "INSTANCE_STATUS": status,
266 "INSTANCE_MEMORY": memory,
267 "INSTANCE_VCPUS": vcpus,
271 nic_count = len(nics)
272 for idx, (ip, bridge, mac) in enumerate(nics):
275 env["INSTANCE_NIC%d_IP" % idx] = ip
276 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
281 env["INSTANCE_NIC_COUNT"] = nic_count
286 def _BuildInstanceHookEnvByObject(instance, override=None):
287 """Builds instance related env variables for hooks from an object.
290 instance: objects.Instance object of instance
291 override: dict of values to override
294 'name': instance.name,
295 'primary_node': instance.primary_node,
296 'secondary_nodes': instance.secondary_nodes,
297 'os_type': instance.os,
298 'status': instance.os,
299 'memory': instance.memory,
300 'vcpus': instance.vcpus,
301 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
304 args.update(override)
305 return _BuildInstanceHookEnv(**args)
308 def _UpdateKnownHosts(fullnode, ip, pubkey):
309 """Ensure a node has a correct known_hosts entry.
312 fullnode - Fully qualified domain name of host. (str)
313 ip - IPv4 address of host (str)
314 pubkey - the public key of the cluster
317 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
318 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
320 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
329 logger.Debug('read %s' % (repr(rawline),))
331 parts = rawline.rstrip('\r\n').split()
333 # Ignore unwanted lines
334 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
335 fields = parts[0].split(',')
340 for spec in [ ip, fullnode ]:
341 if spec not in fields:
346 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
347 if haveall and key == pubkey:
349 save_lines.append(rawline)
350 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
353 if havesome and (not haveall or key != pubkey):
355 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
358 save_lines.append(rawline)
361 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
362 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
365 save_lines = save_lines + add_lines
367 # Write a new file and replace old.
368 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
370 newfile = os.fdopen(fd, 'w')
372 newfile.write(''.join(save_lines))
375 logger.Debug("Wrote new known_hosts.")
376 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
379 # Simply appending a new line will do the trick.
381 for add in add_lines:
387 def _HasValidVG(vglist, vgname):
388 """Checks if the volume group list is valid.
390 A non-None return value means there's an error, and the return value
391 is the error message.
394 vgsize = vglist.get(vgname, None)
396 return "volume group '%s' missing" % vgname
398 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
403 def _InitSSHSetup(node):
404 """Setup the SSH configuration for the cluster.
407 This generates a dsa keypair for root, adds the pub key to the
408 permitted hosts and adds the hostkey to its own known hosts.
411 node: the name of this host as a fqdn
414 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
416 for name in priv_key, pub_key:
417 if os.path.exists(name):
418 utils.CreateBackup(name)
419 utils.RemoveFile(name)
421 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
425 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
428 f = open(pub_key, 'r')
430 utils.AddAuthorizedKey(auth_keys, f.read(8192))
435 def _InitGanetiServerSetup(ss):
436 """Setup the necessary configuration for the initial node daemon.
438 This creates the nodepass file containing the shared password for
439 the cluster and also generates the SSL certificate.
442 # Create pseudo random password
443 randpass = sha.new(os.urandom(64)).hexdigest()
444 # and write it into sstore
445 ss.SetKey(ss.SS_NODED_PASS, randpass)
447 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
448 "-days", str(365*5), "-nodes", "-x509",
449 "-keyout", constants.SSL_CERT_FILE,
450 "-out", constants.SSL_CERT_FILE, "-batch"])
452 raise errors.OpExecError("could not generate server ssl cert, command"
453 " %s had exitcode %s and error message %s" %
454 (result.cmd, result.exit_code, result.output))
456 os.chmod(constants.SSL_CERT_FILE, 0400)
458 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
461 raise errors.OpExecError("Could not start the node daemon, command %s"
462 " had exitcode %s and error %s" %
463 (result.cmd, result.exit_code, result.output))
466 def _CheckInstanceBridgesExist(instance):
467 """Check that the brigdes needed by an instance exist.
470 # check bridges existance
471 brlist = [nic.bridge for nic in instance.nics]
472 if not rpc.call_bridges_exist(instance.primary_node, brlist):
473 raise errors.OpPrereqError("one or more target bridges %s does not"
474 " exist on destination node '%s'" %
475 (brlist, instance.primary_node))
478 class LUInitCluster(LogicalUnit):
479 """Initialise the cluster.
482 HPATH = "cluster-init"
483 HTYPE = constants.HTYPE_CLUSTER
484 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
485 "def_bridge", "master_netdev"]
488 def BuildHooksEnv(self):
491 Notes: Since we don't require a cluster, we must manually add
492 ourselves in the post-run node list.
495 env = {"OP_TARGET": self.op.cluster_name}
496 return env, [], [self.hostname.name]
498 def CheckPrereq(self):
499 """Verify that the passed name is a valid one.
502 if config.ConfigWriter.IsCluster():
503 raise errors.OpPrereqError("Cluster is already initialised")
505 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
506 if not os.path.exists(constants.VNC_PASSWORD_FILE):
507 raise errors.OpPrereqError("Please prepare the cluster VNC"
509 constants.VNC_PASSWORD_FILE)
511 self.hostname = hostname = utils.HostInfo()
513 if hostname.ip.startswith("127."):
514 raise errors.OpPrereqError("This host's IP resolves to the private"
515 " range (%s). Please fix DNS or %s." %
516 (hostname.ip, constants.ETC_HOSTS))
518 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
519 source=constants.LOCALHOST_IP_ADDRESS):
520 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521 " to %s,\nbut this ip address does not"
522 " belong to this host."
523 " Aborting." % hostname.ip)
525 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
527 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
529 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
531 secondary_ip = getattr(self.op, "secondary_ip", None)
532 if secondary_ip and not utils.IsValidIP(secondary_ip):
533 raise errors.OpPrereqError("Invalid secondary ip given")
535 secondary_ip != hostname.ip and
536 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
537 source=constants.LOCALHOST_IP_ADDRESS))):
538 raise errors.OpPrereqError("You gave %s as secondary IP,"
539 " but it does not belong to this host." %
541 self.secondary_ip = secondary_ip
543 # checks presence of the volume group given
544 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
547 raise errors.OpPrereqError("Error: %s" % vgstatus)
549 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
551 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
554 if self.op.hypervisor_type not in constants.HYPER_TYPES:
555 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
556 self.op.hypervisor_type)
558 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
560 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
561 (self.op.master_netdev,
562 result.output.strip()))
564 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
565 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
566 raise errors.OpPrereqError("Init.d script '%s' missing or not"
567 " executable." % constants.NODE_INITD_SCRIPT)
569 def Exec(self, feedback_fn):
570 """Initialize the cluster.
573 clustername = self.clustername
574 hostname = self.hostname
576 # set up the simple store
577 self.sstore = ss = ssconf.SimpleStore()
578 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
579 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
580 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
581 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
582 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
584 # set up the inter-node password and certificate
585 _InitGanetiServerSetup(ss)
587 # start the master ip
588 rpc.call_node_start_master(hostname.name)
590 # set up ssh config and /etc/hosts
591 f = open(constants.SSH_HOST_RSA_PUB, 'r')
596 sshkey = sshline.split(" ")[1]
598 _AddHostToEtcHosts(hostname.name)
600 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
602 _InitSSHSetup(hostname.name)
604 # init of cluster config file
605 self.cfg = cfgw = config.ConfigWriter()
606 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
607 sshkey, self.op.mac_prefix,
608 self.op.vg_name, self.op.def_bridge)
611 class LUDestroyCluster(NoHooksLU):
612 """Logical unit for destroying the cluster.
617 def CheckPrereq(self):
618 """Check prerequisites.
620 This checks whether the cluster is empty.
622 Any errors are signalled by raising errors.OpPrereqError.
625 master = self.sstore.GetMasterNode()
627 nodelist = self.cfg.GetNodeList()
628 if len(nodelist) != 1 or nodelist[0] != master:
629 raise errors.OpPrereqError("There are still %d node(s) in"
630 " this cluster." % (len(nodelist) - 1))
631 instancelist = self.cfg.GetInstanceList()
633 raise errors.OpPrereqError("There are still %d instance(s) in"
634 " this cluster." % len(instancelist))
636 def Exec(self, feedback_fn):
637 """Destroys the cluster.
640 master = self.sstore.GetMasterNode()
641 if not rpc.call_node_stop_master(master):
642 raise errors.OpExecError("Could not disable the master role")
643 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
644 utils.CreateBackup(priv_key)
645 utils.CreateBackup(pub_key)
646 rpc.call_node_leave_cluster(master)
649 class LUVerifyCluster(NoHooksLU):
650 """Verifies the cluster status.
653 _OP_REQP = ["skip_checks"]
655 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
656 remote_version, feedback_fn):
657 """Run multiple tests against a node.
660 - compares ganeti version
661 - checks vg existance and size > 20G
662 - checks config file checksum
663 - checks ssh to other nodes
666 node: name of the node to check
667 file_list: required list of files
668 local_cksum: dictionary of local files and their checksums
671 # compares ganeti version
672 local_version = constants.PROTOCOL_VERSION
673 if not remote_version:
674 feedback_fn(" - ERROR: connection to %s failed" % (node))
677 if local_version != remote_version:
678 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
679 (local_version, node, remote_version))
682 # checks vg existance and size > 20G
686 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
690 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
692 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
695 # checks config file checksum
698 if 'filelist' not in node_result:
700 feedback_fn(" - ERROR: node hasn't returned file checksum data")
702 remote_cksum = node_result['filelist']
703 for file_name in file_list:
704 if file_name not in remote_cksum:
706 feedback_fn(" - ERROR: file '%s' missing" % file_name)
707 elif remote_cksum[file_name] != local_cksum[file_name]:
709 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
711 if 'nodelist' not in node_result:
713 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
715 if node_result['nodelist']:
717 for node in node_result['nodelist']:
718 feedback_fn(" - ERROR: communication with node '%s': %s" %
719 (node, node_result['nodelist'][node]))
720 hyp_result = node_result.get('hypervisor', None)
721 if hyp_result is not None:
722 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
725 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
726 node_instance, feedback_fn):
727 """Verify an instance.
729 This function checks to see if the required block devices are
730 available on the instance's node.
735 node_current = instanceconfig.primary_node
738 instanceconfig.MapLVsByNode(node_vol_should)
740 for node in node_vol_should:
741 for volume in node_vol_should[node]:
742 if node not in node_vol_is or volume not in node_vol_is[node]:
743 feedback_fn(" - ERROR: volume %s missing on node %s" %
747 if not instanceconfig.status == 'down':
748 if (node_current not in node_instance or
749 not instance in node_instance[node_current]):
750 feedback_fn(" - ERROR: instance %s not running on node %s" %
751 (instance, node_current))
754 for node in node_instance:
755 if (not node == node_current):
756 if instance in node_instance[node]:
757 feedback_fn(" - ERROR: instance %s should not run on node %s" %
763 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764 """Verify if there are any unknown volumes in the cluster.
766 The .os, .swap and backup volumes are ignored. All other volumes are
772 for node in node_vol_is:
773 for volume in node_vol_is[node]:
774 if node not in node_vol_should or volume not in node_vol_should[node]:
775 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
780 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781 """Verify the list of running instances.
783 This checks what instances are running but unknown to the cluster.
787 for node in node_instance:
788 for runninginstance in node_instance[node]:
789 if runninginstance not in instancelist:
790 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
791 (runninginstance, node))
795 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
796 """Verify N+1 Memory Resilience.
798 Check that if one single node dies we can still start all the instances it
804 for node, nodeinfo in node_info.iteritems():
805 # This code checks that every node which is now listed as secondary has
806 # enough memory to host all instances it is supposed to should a single
807 # other node in the cluster fail.
808 # FIXME: not ready for failover to an arbitrary node
809 # FIXME: does not support file-backed instances
810 # WARNING: we currently take into account down instances as well as up
811 # ones, considering that even if they're down someone might want to start
812 # them even in the event of a node failure.
813 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
815 for instance in instances:
816 needed_mem += instance_cfg[instance].memory
817 if nodeinfo['mfree'] < needed_mem:
818 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
819 " failovers should node %s fail" % (node, prinode))
823 def CheckPrereq(self):
824 """Check prerequisites.
826 Transform the list of checks we're going to skip into a set and check that
827 all its members are valid.
830 self.skip_set = frozenset(self.op.skip_checks)
831 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
832 raise errors.OpPrereqError("Invalid checks to be skipped specified")
834 def Exec(self, feedback_fn):
835 """Verify integrity of cluster, performing various test on nodes.
839 feedback_fn("* Verifying global settings")
840 for msg in self.cfg.VerifyConfig():
841 feedback_fn(" - ERROR: %s" % msg)
843 vg_name = self.cfg.GetVGName()
844 nodelist = utils.NiceSort(self.cfg.GetNodeList())
845 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
846 i_non_redundant = [] # Non redundant instances
852 # FIXME: verify OS list
854 file_names = list(self.sstore.GetFileList())
855 file_names.append(constants.SSL_CERT_FILE)
856 file_names.append(constants.CLUSTER_CONF_FILE)
857 local_checksums = utils.FingerprintFiles(file_names)
859 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
860 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
861 all_instanceinfo = rpc.call_instance_list(nodelist)
862 all_vglist = rpc.call_vg_list(nodelist)
863 node_verify_param = {
864 'filelist': file_names,
865 'nodelist': nodelist,
868 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
869 all_rversion = rpc.call_version(nodelist)
870 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
872 for node in nodelist:
873 feedback_fn("* Verifying node %s" % node)
874 result = self._VerifyNode(node, file_names, local_checksums,
875 all_vglist[node], all_nvinfo[node],
876 all_rversion[node], feedback_fn)
880 volumeinfo = all_volumeinfo[node]
882 if isinstance(volumeinfo, basestring):
883 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
884 (node, volumeinfo[-400:].encode('string_escape')))
886 node_volume[node] = {}
887 elif not isinstance(volumeinfo, dict):
888 feedback_fn(" - ERROR: connection to %s failed" % (node,))
892 node_volume[node] = volumeinfo
895 nodeinstance = all_instanceinfo[node]
896 if type(nodeinstance) != list:
897 feedback_fn(" - ERROR: connection to %s failed" % (node,))
901 node_instance[node] = nodeinstance
904 nodeinfo = all_ninfo[node]
905 if not isinstance(nodeinfo, dict):
906 feedback_fn(" - ERROR: connection to %s failed" % (node,))
912 "mfree": int(nodeinfo['memory_free']),
913 "dfree": int(nodeinfo['vg_free']),
916 # dictionary holding all instances this node is secondary for,
917 # grouped by their primary node. Each key is a cluster node, and each
918 # value is a list of instances which have the key as primary and the
919 # current node as secondary. this is handy to calculate N+1 memory
920 # availability if you can only failover from a primary to its
922 "sinst-by-pnode": {},
925 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
931 for instance in instancelist:
932 feedback_fn("* Verifying instance %s" % instance)
933 inst_config = self.cfg.GetInstanceInfo(instance)
934 result = self._VerifyInstance(instance, inst_config, node_volume,
935 node_instance, feedback_fn)
938 inst_config.MapLVsByNode(node_vol_should)
940 instance_cfg[instance] = inst_config
942 pnode = inst_config.primary_node
943 if pnode in node_info:
944 node_info[pnode]['pinst'].append(instance)
946 feedback_fn(" - ERROR: instance %s, connection to primary node"
947 " %s failed" % (instance, pnode))
950 # If the instance is non-redundant we cannot survive losing its primary
951 # node, so we are not N+1 compliant. On the other hand we have no disk
952 # templates with more than one secondary so that situation is not well
954 # FIXME: does not support file-backed instances
955 if len(inst_config.secondary_nodes) == 0:
956 i_non_redundant.append(instance)
957 elif len(inst_config.secondary_nodes) > 1:
958 feedback_fn(" - WARNING: multiple secondaries for instance %s"
961 for snode in inst_config.secondary_nodes:
962 if snode in node_info:
963 node_info[snode]['sinst'].append(instance)
964 if pnode not in node_info[snode]['sinst-by-pnode']:
965 node_info[snode]['sinst-by-pnode'][pnode] = []
966 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
968 feedback_fn(" - ERROR: instance %s, connection to secondary node"
969 " %s failed" % (instance, snode))
971 feedback_fn("* Verifying orphan volumes")
972 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976 feedback_fn("* Verifying remaining instances")
977 result = self._VerifyOrphanInstances(instancelist, node_instance,
981 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
982 feedback_fn("* Verifying N+1 Memory redundancy")
983 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
986 feedback_fn("* Other Notes")
988 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
989 % len(i_non_redundant))
994 class LUVerifyDisks(NoHooksLU):
995 """Verifies the cluster disks status.
1000 def CheckPrereq(self):
1001 """Check prerequisites.
1003 This has no prerequisites.
1008 def Exec(self, feedback_fn):
1009 """Verify integrity of cluster disks.
1012 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1014 vg_name = self.cfg.GetVGName()
1015 nodes = utils.NiceSort(self.cfg.GetNodeList())
1016 instances = [self.cfg.GetInstanceInfo(name)
1017 for name in self.cfg.GetInstanceList()]
1020 for inst in instances:
1022 if (inst.status != "up" or
1023 inst.disk_template not in constants.DTS_NET_MIRROR):
1025 inst.MapLVsByNode(inst_lvs)
1026 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1027 for node, vol_list in inst_lvs.iteritems():
1028 for vol in vol_list:
1029 nv_dict[(node, vol)] = inst
1034 node_lvs = rpc.call_volume_list(nodes, vg_name)
1039 lvs = node_lvs[node]
1041 if isinstance(lvs, basestring):
1042 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1043 res_nlvm[node] = lvs
1044 elif not isinstance(lvs, dict):
1045 logger.Info("connection to node %s failed or invalid data returned" %
1047 res_nodes.append(node)
1050 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1051 inst = nv_dict.pop((node, lv_name), None)
1052 if (not lv_online and inst is not None
1053 and inst.name not in res_instances):
1054 res_instances.append(inst.name)
1056 # any leftover items in nv_dict are missing LVs, let's arrange the
1058 for key, inst in nv_dict.iteritems():
1059 if inst.name not in res_missing:
1060 res_missing[inst.name] = []
1061 res_missing[inst.name].append(key)
1066 class LURenameCluster(LogicalUnit):
1067 """Rename the cluster.
1070 HPATH = "cluster-rename"
1071 HTYPE = constants.HTYPE_CLUSTER
1074 def BuildHooksEnv(self):
1079 "OP_TARGET": self.sstore.GetClusterName(),
1080 "NEW_NAME": self.op.name,
1082 mn = self.sstore.GetMasterNode()
1083 return env, [mn], [mn]
1085 def CheckPrereq(self):
1086 """Verify that the passed name is a valid one.
1089 hostname = utils.HostInfo(self.op.name)
1091 new_name = hostname.name
1092 self.ip = new_ip = hostname.ip
1093 old_name = self.sstore.GetClusterName()
1094 old_ip = self.sstore.GetMasterIP()
1095 if new_name == old_name and new_ip == old_ip:
1096 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1097 " cluster has changed")
1098 if new_ip != old_ip:
1099 result = utils.RunCmd(["fping", "-q", new_ip])
1100 if not result.failed:
1101 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102 " reachable on the network. Aborting." %
1105 self.op.name = new_name
1107 def Exec(self, feedback_fn):
1108 """Rename the cluster.
1111 clustername = self.op.name
1115 # shutdown the master IP
1116 master = ss.GetMasterNode()
1117 if not rpc.call_node_stop_master(master):
1118 raise errors.OpExecError("Could not disable the master role")
1122 ss.SetKey(ss.SS_MASTER_IP, ip)
1123 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1125 # Distribute updated ss config to all nodes
1126 myself = self.cfg.GetNodeInfo(master)
1127 dist_nodes = self.cfg.GetNodeList()
1128 if myself.name in dist_nodes:
1129 dist_nodes.remove(myself.name)
1131 logger.Debug("Copying updated ssconf data to all nodes")
1132 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133 fname = ss.KeyToFilename(keyname)
1134 result = rpc.call_upload_file(dist_nodes, fname)
1135 for to_node in dist_nodes:
1136 if not result[to_node]:
1137 logger.Error("copy of file %s to node %s failed" %
1140 if not rpc.call_node_start_master(master):
1141 logger.Error("Could not re-enable the master role on the master,"
1142 " please restart manually.")
1145 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1146 """Sleep and poll for an instance's disk to sync.
1149 if not instance.disks:
1153 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1155 node = instance.primary_node
1157 for dev in instance.disks:
1158 cfgw.SetDiskID(dev, node)
1164 cumul_degraded = False
1165 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1167 proc.LogWarning("Can't get any data from node %s" % node)
1170 raise errors.RemoteError("Can't contact node %s for mirror data,"
1171 " aborting." % node)
1175 for i in range(len(rstats)):
1178 proc.LogWarning("Can't compute data for node %s/%s" %
1179 (node, instance.disks[i].iv_name))
1181 # we ignore the ldisk parameter
1182 perc_done, est_time, is_degraded, _ = mstat
1183 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1184 if perc_done is not None:
1186 if est_time is not None:
1187 rem_time = "%d estimated seconds remaining" % est_time
1190 rem_time = "no time estimate"
1191 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1192 (instance.disks[i].iv_name, perc_done, rem_time))
1199 time.sleep(min(60, max_time))
1205 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1206 return not cumul_degraded
1209 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1210 """Check that mirrors are not degraded.
1212 The ldisk parameter, if True, will change the test from the
1213 is_degraded attribute (which represents overall non-ok status for
1214 the device(s)) to the ldisk (representing the local storage status).
1217 cfgw.SetDiskID(dev, node)
1224 if on_primary or dev.AssembleOnSecondary():
1225 rstats = rpc.call_blockdev_find(node, dev)
1227 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1230 result = result and (not rstats[idx])
1232 for child in dev.children:
1233 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1238 class LUDiagnoseOS(NoHooksLU):
1239 """Logical unit for OS diagnose/query.
1244 def CheckPrereq(self):
1245 """Check prerequisites.
1247 This always succeeds, since this is a pure query LU.
1252 def Exec(self, feedback_fn):
1253 """Compute the list of OSes.
1256 node_list = self.cfg.GetNodeList()
1257 node_data = rpc.call_os_diagnose(node_list)
1258 if node_data == False:
1259 raise errors.OpExecError("Can't gather the list of OSes")
1263 class LURemoveNode(LogicalUnit):
1264 """Logical unit for removing a node.
1267 HPATH = "node-remove"
1268 HTYPE = constants.HTYPE_NODE
1269 _OP_REQP = ["node_name"]
1271 def BuildHooksEnv(self):
1274 This doesn't run on the target node in the pre phase as a failed
1275 node would not allows itself to run.
1279 "OP_TARGET": self.op.node_name,
1280 "NODE_NAME": self.op.node_name,
1282 all_nodes = self.cfg.GetNodeList()
1283 all_nodes.remove(self.op.node_name)
1284 return env, all_nodes, all_nodes
1286 def CheckPrereq(self):
1287 """Check prerequisites.
1290 - the node exists in the configuration
1291 - it does not have primary or secondary instances
1292 - it's not the master
1294 Any errors are signalled by raising errors.OpPrereqError.
1297 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1299 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1301 instance_list = self.cfg.GetInstanceList()
1303 masternode = self.sstore.GetMasterNode()
1304 if node.name == masternode:
1305 raise errors.OpPrereqError("Node is the master node,"
1306 " you need to failover first.")
1308 for instance_name in instance_list:
1309 instance = self.cfg.GetInstanceInfo(instance_name)
1310 if node.name == instance.primary_node:
1311 raise errors.OpPrereqError("Instance %s still running on the node,"
1312 " please remove first." % instance_name)
1313 if node.name in instance.secondary_nodes:
1314 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1315 " please remove first." % instance_name)
1316 self.op.node_name = node.name
1319 def Exec(self, feedback_fn):
1320 """Removes the node from the cluster.
1324 logger.Info("stopping the node daemon and removing configs from node %s" %
1327 rpc.call_node_leave_cluster(node.name)
1329 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1331 logger.Info("Removing node %s from config" % node.name)
1333 self.cfg.RemoveNode(node.name)
1335 _RemoveHostFromEtcHosts(node.name)
1338 class LUQueryNodes(NoHooksLU):
1339 """Logical unit for querying nodes.
1342 _OP_REQP = ["output_fields", "names"]
1344 def CheckPrereq(self):
1345 """Check prerequisites.
1347 This checks that the fields required are valid output fields.
1350 self.dynamic_fields = frozenset(["dtotal", "dfree",
1351 "mtotal", "mnode", "mfree",
1354 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1355 "pinst_list", "sinst_list",
1357 dynamic=self.dynamic_fields,
1358 selected=self.op.output_fields)
1360 self.wanted = _GetWantedNodes(self, self.op.names)
1362 def Exec(self, feedback_fn):
1363 """Computes the list of nodes and their attributes.
1366 nodenames = self.wanted
1367 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1369 # begin data gathering
1371 if self.dynamic_fields.intersection(self.op.output_fields):
1373 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1374 for name in nodenames:
1375 nodeinfo = node_data.get(name, None)
1378 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1379 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1380 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1381 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1382 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1383 "bootid": nodeinfo['bootid'],
1386 live_data[name] = {}
1388 live_data = dict.fromkeys(nodenames, {})
1390 node_to_primary = dict([(name, set()) for name in nodenames])
1391 node_to_secondary = dict([(name, set()) for name in nodenames])
1393 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1394 "sinst_cnt", "sinst_list"))
1395 if inst_fields & frozenset(self.op.output_fields):
1396 instancelist = self.cfg.GetInstanceList()
1398 for instance_name in instancelist:
1399 inst = self.cfg.GetInstanceInfo(instance_name)
1400 if inst.primary_node in node_to_primary:
1401 node_to_primary[inst.primary_node].add(inst.name)
1402 for secnode in inst.secondary_nodes:
1403 if secnode in node_to_secondary:
1404 node_to_secondary[secnode].add(inst.name)
1406 # end data gathering
1409 for node in nodelist:
1411 for field in self.op.output_fields:
1414 elif field == "pinst_list":
1415 val = list(node_to_primary[node.name])
1416 elif field == "sinst_list":
1417 val = list(node_to_secondary[node.name])
1418 elif field == "pinst_cnt":
1419 val = len(node_to_primary[node.name])
1420 elif field == "sinst_cnt":
1421 val = len(node_to_secondary[node.name])
1422 elif field == "pip":
1423 val = node.primary_ip
1424 elif field == "sip":
1425 val = node.secondary_ip
1426 elif field in self.dynamic_fields:
1427 val = live_data[node.name].get(field, None)
1429 raise errors.ParameterError(field)
1430 node_output.append(val)
1431 output.append(node_output)
1436 class LUQueryNodeVolumes(NoHooksLU):
1437 """Logical unit for getting volumes on node(s).
1440 _OP_REQP = ["nodes", "output_fields"]
1442 def CheckPrereq(self):
1443 """Check prerequisites.
1445 This checks that the fields required are valid output fields.
1448 self.nodes = _GetWantedNodes(self, self.op.nodes)
1450 _CheckOutputFields(static=["node"],
1451 dynamic=["phys", "vg", "name", "size", "instance"],
1452 selected=self.op.output_fields)
1455 def Exec(self, feedback_fn):
1456 """Computes the list of nodes and their attributes.
1459 nodenames = self.nodes
1460 volumes = rpc.call_node_volumes(nodenames)
1462 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1463 in self.cfg.GetInstanceList()]
1465 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1468 for node in nodenames:
1469 if node not in volumes or not volumes[node]:
1472 node_vols = volumes[node][:]
1473 node_vols.sort(key=lambda vol: vol['dev'])
1475 for vol in node_vols:
1477 for field in self.op.output_fields:
1480 elif field == "phys":
1484 elif field == "name":
1486 elif field == "size":
1487 val = int(float(vol['size']))
1488 elif field == "instance":
1490 if node not in lv_by_node[inst]:
1492 if vol['name'] in lv_by_node[inst][node]:
1498 raise errors.ParameterError(field)
1499 node_output.append(str(val))
1501 output.append(node_output)
1506 class LUAddNode(LogicalUnit):
1507 """Logical unit for adding node to the cluster.
1511 HTYPE = constants.HTYPE_NODE
1512 _OP_REQP = ["node_name"]
1514 def BuildHooksEnv(self):
1517 This will run on all nodes before, and on all nodes + the new node after.
1521 "OP_TARGET": self.op.node_name,
1522 "NODE_NAME": self.op.node_name,
1523 "NODE_PIP": self.op.primary_ip,
1524 "NODE_SIP": self.op.secondary_ip,
1526 nodes_0 = self.cfg.GetNodeList()
1527 nodes_1 = nodes_0 + [self.op.node_name, ]
1528 return env, nodes_0, nodes_1
1530 def CheckPrereq(self):
1531 """Check prerequisites.
1534 - the new node is not already in the config
1536 - its parameters (single/dual homed) matches the cluster
1538 Any errors are signalled by raising errors.OpPrereqError.
1541 node_name = self.op.node_name
1544 dns_data = utils.HostInfo(node_name)
1546 node = dns_data.name
1547 primary_ip = self.op.primary_ip = dns_data.ip
1548 secondary_ip = getattr(self.op, "secondary_ip", None)
1549 if secondary_ip is None:
1550 secondary_ip = primary_ip
1551 if not utils.IsValidIP(secondary_ip):
1552 raise errors.OpPrereqError("Invalid secondary IP given")
1553 self.op.secondary_ip = secondary_ip
1554 node_list = cfg.GetNodeList()
1555 if node in node_list:
1556 raise errors.OpPrereqError("Node %s is already in the configuration"
1559 for existing_node_name in node_list:
1560 existing_node = cfg.GetNodeInfo(existing_node_name)
1561 if (existing_node.primary_ip == primary_ip or
1562 existing_node.secondary_ip == primary_ip or
1563 existing_node.primary_ip == secondary_ip or
1564 existing_node.secondary_ip == secondary_ip):
1565 raise errors.OpPrereqError("New node ip address(es) conflict with"
1566 " existing node %s" % existing_node.name)
1568 # check that the type of the node (single versus dual homed) is the
1569 # same as for the master
1570 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1571 master_singlehomed = myself.secondary_ip == myself.primary_ip
1572 newbie_singlehomed = secondary_ip == primary_ip
1573 if master_singlehomed != newbie_singlehomed:
1574 if master_singlehomed:
1575 raise errors.OpPrereqError("The master has no private ip but the"
1576 " new node has one")
1578 raise errors.OpPrereqError("The master has a private ip but the"
1579 " new node doesn't have one")
1581 # checks reachablity
1582 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1583 raise errors.OpPrereqError("Node not reachable by ping")
1585 if not newbie_singlehomed:
1586 # check reachability from my secondary ip to newbie's secondary ip
1587 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1588 source=myself.secondary_ip):
1589 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1590 " based ping to noded port")
1592 self.new_node = objects.Node(name=node,
1593 primary_ip=primary_ip,
1594 secondary_ip=secondary_ip)
1596 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1597 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1598 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1599 constants.VNC_PASSWORD_FILE)
1601 def Exec(self, feedback_fn):
1602 """Adds the new node to the cluster.
1605 new_node = self.new_node
1606 node = new_node.name
1608 # set up inter-node password and certificate and restarts the node daemon
1609 gntpass = self.sstore.GetNodeDaemonPassword()
1610 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1611 raise errors.OpExecError("ganeti password corruption detected")
1612 f = open(constants.SSL_CERT_FILE)
1614 gntpem = f.read(8192)
1617 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1618 # so we use this to detect an invalid certificate; as long as the
1619 # cert doesn't contain this, the here-document will be correctly
1620 # parsed by the shell sequence below
1621 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1622 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1623 if not gntpem.endswith("\n"):
1624 raise errors.OpExecError("PEM must end with newline")
1625 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1627 # and then connect with ssh to set password and start ganeti-noded
1628 # note that all the below variables are sanitized at this point,
1629 # either by being constants or by the checks above
1631 mycommand = ("umask 077 && "
1632 "echo '%s' > '%s' && "
1633 "cat > '%s' << '!EOF.' && \n"
1634 "%s!EOF.\n%s restart" %
1635 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1636 constants.SSL_CERT_FILE, gntpem,
1637 constants.NODE_INITD_SCRIPT))
1639 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1641 raise errors.OpExecError("Remote command on node %s, error: %s,"
1643 (node, result.fail_reason, result.output))
1645 # check connectivity
1648 result = rpc.call_version([node])[node]
1650 if constants.PROTOCOL_VERSION == result:
1651 logger.Info("communication to node %s fine, sw version %s match" %
1654 raise errors.OpExecError("Version mismatch master version %s,"
1655 " node version %s" %
1656 (constants.PROTOCOL_VERSION, result))
1658 raise errors.OpExecError("Cannot get version from the new node")
1661 logger.Info("copy ssh key to node %s" % node)
1662 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1664 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1665 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1671 keyarray.append(f.read())
1675 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1676 keyarray[3], keyarray[4], keyarray[5])
1679 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1681 # Add node to our /etc/hosts, and add key to known_hosts
1682 _AddHostToEtcHosts(new_node.name)
1684 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1685 self.cfg.GetHostKey())
1687 if new_node.secondary_ip != new_node.primary_ip:
1688 if not rpc.call_node_tcp_ping(new_node.name,
1689 constants.LOCALHOST_IP_ADDRESS,
1690 new_node.secondary_ip,
1691 constants.DEFAULT_NODED_PORT,
1693 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1694 " you gave (%s). Please fix and re-run this"
1695 " command." % new_node.secondary_ip)
1697 success, msg = ssh.VerifyNodeHostname(node)
1699 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1700 " than the one the resolver gives: %s."
1701 " Please fix and re-run this command." %
1704 # Distribute updated /etc/hosts and known_hosts to all nodes,
1705 # including the node just added
1706 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1707 dist_nodes = self.cfg.GetNodeList() + [node]
1708 if myself.name in dist_nodes:
1709 dist_nodes.remove(myself.name)
1711 logger.Debug("Copying hosts and known_hosts to all nodes")
1712 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1713 result = rpc.call_upload_file(dist_nodes, fname)
1714 for to_node in dist_nodes:
1715 if not result[to_node]:
1716 logger.Error("copy of file %s to node %s failed" %
1719 to_copy = ss.GetFileList()
1720 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1721 to_copy.append(constants.VNC_PASSWORD_FILE)
1722 for fname in to_copy:
1723 if not ssh.CopyFileToNode(node, fname):
1724 logger.Error("could not copy file %s to node %s" % (fname, node))
1726 logger.Info("adding node %s to cluster.conf" % node)
1727 self.cfg.AddNode(new_node)
1730 class LUMasterFailover(LogicalUnit):
1731 """Failover the master node to the current node.
1733 This is a special LU in that it must run on a non-master node.
1736 HPATH = "master-failover"
1737 HTYPE = constants.HTYPE_CLUSTER
1741 def BuildHooksEnv(self):
1744 This will run on the new master only in the pre phase, and on all
1745 the nodes in the post phase.
1749 "OP_TARGET": self.new_master,
1750 "NEW_MASTER": self.new_master,
1751 "OLD_MASTER": self.old_master,
1753 return env, [self.new_master], self.cfg.GetNodeList()
1755 def CheckPrereq(self):
1756 """Check prerequisites.
1758 This checks that we are not already the master.
1761 self.new_master = utils.HostInfo().name
1762 self.old_master = self.sstore.GetMasterNode()
1764 if self.old_master == self.new_master:
1765 raise errors.OpPrereqError("This commands must be run on the node"
1766 " where you want the new master to be."
1767 " %s is already the master" %
1770 def Exec(self, feedback_fn):
1771 """Failover the master node.
1773 This command, when run on a non-master node, will cause the current
1774 master to cease being master, and the non-master to become new
1778 #TODO: do not rely on gethostname returning the FQDN
1779 logger.Info("setting master to %s, old master: %s" %
1780 (self.new_master, self.old_master))
1782 if not rpc.call_node_stop_master(self.old_master):
1783 logger.Error("could disable the master role on the old master"
1784 " %s, please disable manually" % self.old_master)
1787 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1788 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1789 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1790 logger.Error("could not distribute the new simple store master file"
1791 " to the other nodes, please check.")
1793 if not rpc.call_node_start_master(self.new_master):
1794 logger.Error("could not start the master role on the new master"
1795 " %s, please check" % self.new_master)
1796 feedback_fn("Error in activating the master IP on the new master,"
1797 " please fix manually.")
1801 class LUQueryClusterInfo(NoHooksLU):
1802 """Query cluster configuration.
1808 def CheckPrereq(self):
1809 """No prerequsites needed for this LU.
1814 def Exec(self, feedback_fn):
1815 """Return cluster config.
1819 "name": self.sstore.GetClusterName(),
1820 "software_version": constants.RELEASE_VERSION,
1821 "protocol_version": constants.PROTOCOL_VERSION,
1822 "config_version": constants.CONFIG_VERSION,
1823 "os_api_version": constants.OS_API_VERSION,
1824 "export_version": constants.EXPORT_VERSION,
1825 "master": self.sstore.GetMasterNode(),
1826 "architecture": (platform.architecture()[0], platform.machine()),
1832 class LUClusterCopyFile(NoHooksLU):
1833 """Copy file to cluster.
1836 _OP_REQP = ["nodes", "filename"]
1838 def CheckPrereq(self):
1839 """Check prerequisites.
1841 It should check that the named file exists and that the given list
1845 if not os.path.exists(self.op.filename):
1846 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1848 self.nodes = _GetWantedNodes(self, self.op.nodes)
1850 def Exec(self, feedback_fn):
1851 """Copy a file from master to some nodes.
1854 opts - class with options as members
1855 args - list containing a single element, the file name
1857 nodes - list containing the name of target nodes; if empty, all nodes
1860 filename = self.op.filename
1862 myname = utils.HostInfo().name
1864 for node in self.nodes:
1867 if not ssh.CopyFileToNode(node, filename):
1868 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1871 class LUDumpClusterConfig(NoHooksLU):
1872 """Return a text-representation of the cluster-config.
1877 def CheckPrereq(self):
1878 """No prerequisites.
1883 def Exec(self, feedback_fn):
1884 """Dump a representation of the cluster config to the standard output.
1887 return self.cfg.DumpConfig()
1890 class LURunClusterCommand(NoHooksLU):
1891 """Run a command on some nodes.
1894 _OP_REQP = ["command", "nodes"]
1896 def CheckPrereq(self):
1897 """Check prerequisites.
1899 It checks that the given list of nodes is valid.
1902 self.nodes = _GetWantedNodes(self, self.op.nodes)
1904 def Exec(self, feedback_fn):
1905 """Run a command on some nodes.
1908 # put the master at the end of the nodes list
1909 master_node = self.sstore.GetMasterNode()
1910 if master_node in self.nodes:
1911 self.nodes.remove(master_node)
1912 self.nodes.append(master_node)
1915 for node in self.nodes:
1916 result = ssh.SSHCall(node, "root", self.op.command)
1917 data.append((node, result.output, result.exit_code))
1922 class LUActivateInstanceDisks(NoHooksLU):
1923 """Bring up an instance's disks.
1926 _OP_REQP = ["instance_name"]
1928 def CheckPrereq(self):
1929 """Check prerequisites.
1931 This checks that the instance is in the cluster.
1934 instance = self.cfg.GetInstanceInfo(
1935 self.cfg.ExpandInstanceName(self.op.instance_name))
1936 if instance is None:
1937 raise errors.OpPrereqError("Instance '%s' not known" %
1938 self.op.instance_name)
1939 self.instance = instance
1942 def Exec(self, feedback_fn):
1943 """Activate the disks.
1946 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1948 raise errors.OpExecError("Cannot activate block devices")
1953 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1954 """Prepare the block devices for an instance.
1956 This sets up the block devices on all nodes.
1959 instance: a ganeti.objects.Instance object
1960 ignore_secondaries: if true, errors on secondary nodes won't result
1961 in an error return from the function
1964 false if the operation failed
1965 list of (host, instance_visible_name, node_visible_name) if the operation
1966 suceeded with the mapping from node devices to instance devices
1970 iname = instance.name
1971 # With the two passes mechanism we try to reduce the window of
1972 # opportunity for the race condition of switching DRBD to primary
1973 # before handshaking occured, but we do not eliminate it
1975 # The proper fix would be to wait (with some limits) until the
1976 # connection has been made and drbd transitions from WFConnection
1977 # into any other network-connected state (Connected, SyncTarget,
1980 # 1st pass, assemble on all nodes in secondary mode
1981 for inst_disk in instance.disks:
1982 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1983 cfg.SetDiskID(node_disk, node)
1984 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1986 logger.Error("could not prepare block device %s on node %s"
1987 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1988 if not ignore_secondaries:
1991 # FIXME: race condition on drbd migration to primary
1993 # 2nd pass, do only the primary node
1994 for inst_disk in instance.disks:
1995 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1996 if node != instance.primary_node:
1998 cfg.SetDiskID(node_disk, node)
1999 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2001 logger.Error("could not prepare block device %s on node %s"
2002 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2004 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2006 # leave the disks configured for the primary node
2007 # this is a workaround that would be fixed better by
2008 # improving the logical/physical id handling
2009 for disk in instance.disks:
2010 cfg.SetDiskID(disk, instance.primary_node)
2012 return disks_ok, device_info
2015 def _StartInstanceDisks(cfg, instance, force):
2016 """Start the disks of an instance.
2019 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2020 ignore_secondaries=force)
2022 _ShutdownInstanceDisks(instance, cfg)
2023 if force is not None and not force:
2024 logger.Error("If the message above refers to a secondary node,"
2025 " you can retry the operation using '--force'.")
2026 raise errors.OpExecError("Disk consistency error")
2029 class LUDeactivateInstanceDisks(NoHooksLU):
2030 """Shutdown an instance's disks.
2033 _OP_REQP = ["instance_name"]
2035 def CheckPrereq(self):
2036 """Check prerequisites.
2038 This checks that the instance is in the cluster.
2041 instance = self.cfg.GetInstanceInfo(
2042 self.cfg.ExpandInstanceName(self.op.instance_name))
2043 if instance is None:
2044 raise errors.OpPrereqError("Instance '%s' not known" %
2045 self.op.instance_name)
2046 self.instance = instance
2048 def Exec(self, feedback_fn):
2049 """Deactivate the disks
2052 instance = self.instance
2053 ins_l = rpc.call_instance_list([instance.primary_node])
2054 ins_l = ins_l[instance.primary_node]
2055 if not type(ins_l) is list:
2056 raise errors.OpExecError("Can't contact node '%s'" %
2057 instance.primary_node)
2059 if self.instance.name in ins_l:
2060 raise errors.OpExecError("Instance is running, can't shutdown"
2063 _ShutdownInstanceDisks(instance, self.cfg)
2066 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2067 """Shutdown block devices of an instance.
2069 This does the shutdown on all nodes of the instance.
2071 If the ignore_primary is false, errors on the primary node are
2076 for disk in instance.disks:
2077 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2078 cfg.SetDiskID(top_disk, node)
2079 if not rpc.call_blockdev_shutdown(node, top_disk):
2080 logger.Error("could not shutdown block device %s on node %s" %
2081 (disk.iv_name, node))
2082 if not ignore_primary or node != instance.primary_node:
2087 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2088 """Checks if a node has enough free memory.
2090 This function check if a given node has the needed amount of free
2091 memory. In case the node has less memory or we cannot get the
2092 information from the node, this function raise an OpPrereqError
2096 - cfg: a ConfigWriter instance
2097 - node: the node name
2098 - reason: string to use in the error message
2099 - requested: the amount of memory in MiB
2102 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2103 if not nodeinfo or not isinstance(nodeinfo, dict):
2104 raise errors.OpPrereqError("Could not contact node %s for resource"
2105 " information" % (node,))
2107 free_mem = nodeinfo[node].get('memory_free')
2108 if not isinstance(free_mem, int):
2109 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2110 " was '%s'" % (node, free_mem))
2111 if requested > free_mem:
2112 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2113 " needed %s MiB, available %s MiB" %
2114 (node, reason, requested, free_mem))
2117 class LUStartupInstance(LogicalUnit):
2118 """Starts an instance.
2121 HPATH = "instance-start"
2122 HTYPE = constants.HTYPE_INSTANCE
2123 _OP_REQP = ["instance_name", "force"]
2125 def BuildHooksEnv(self):
2128 This runs on master, primary and secondary nodes of the instance.
2132 "FORCE": self.op.force,
2134 env.update(_BuildInstanceHookEnvByObject(self.instance))
2135 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2136 list(self.instance.secondary_nodes))
2139 def CheckPrereq(self):
2140 """Check prerequisites.
2142 This checks that the instance is in the cluster.
2145 instance = self.cfg.GetInstanceInfo(
2146 self.cfg.ExpandInstanceName(self.op.instance_name))
2147 if instance is None:
2148 raise errors.OpPrereqError("Instance '%s' not known" %
2149 self.op.instance_name)
2151 # check bridges existance
2152 _CheckInstanceBridgesExist(instance)
2154 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2155 "starting instance %s" % instance.name,
2158 self.instance = instance
2159 self.op.instance_name = instance.name
2161 def Exec(self, feedback_fn):
2162 """Start the instance.
2165 instance = self.instance
2166 force = self.op.force
2167 extra_args = getattr(self.op, "extra_args", "")
2169 self.cfg.MarkInstanceUp(instance.name)
2171 node_current = instance.primary_node
2173 _StartInstanceDisks(self.cfg, instance, force)
2175 if not rpc.call_instance_start(node_current, instance, extra_args):
2176 _ShutdownInstanceDisks(instance, self.cfg)
2177 raise errors.OpExecError("Could not start instance")
2180 class LURebootInstance(LogicalUnit):
2181 """Reboot an instance.
2184 HPATH = "instance-reboot"
2185 HTYPE = constants.HTYPE_INSTANCE
2186 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2188 def BuildHooksEnv(self):
2191 This runs on master, primary and secondary nodes of the instance.
2195 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2197 env.update(_BuildInstanceHookEnvByObject(self.instance))
2198 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2199 list(self.instance.secondary_nodes))
2202 def CheckPrereq(self):
2203 """Check prerequisites.
2205 This checks that the instance is in the cluster.
2208 instance = self.cfg.GetInstanceInfo(
2209 self.cfg.ExpandInstanceName(self.op.instance_name))
2210 if instance is None:
2211 raise errors.OpPrereqError("Instance '%s' not known" %
2212 self.op.instance_name)
2214 # check bridges existance
2215 _CheckInstanceBridgesExist(instance)
2217 self.instance = instance
2218 self.op.instance_name = instance.name
2220 def Exec(self, feedback_fn):
2221 """Reboot the instance.
2224 instance = self.instance
2225 ignore_secondaries = self.op.ignore_secondaries
2226 reboot_type = self.op.reboot_type
2227 extra_args = getattr(self.op, "extra_args", "")
2229 node_current = instance.primary_node
2231 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2232 constants.INSTANCE_REBOOT_HARD,
2233 constants.INSTANCE_REBOOT_FULL]:
2234 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2235 (constants.INSTANCE_REBOOT_SOFT,
2236 constants.INSTANCE_REBOOT_HARD,
2237 constants.INSTANCE_REBOOT_FULL))
2239 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2240 constants.INSTANCE_REBOOT_HARD]:
2241 if not rpc.call_instance_reboot(node_current, instance,
2242 reboot_type, extra_args):
2243 raise errors.OpExecError("Could not reboot instance")
2245 if not rpc.call_instance_shutdown(node_current, instance):
2246 raise errors.OpExecError("could not shutdown instance for full reboot")
2247 _ShutdownInstanceDisks(instance, self.cfg)
2248 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2249 if not rpc.call_instance_start(node_current, instance, extra_args):
2250 _ShutdownInstanceDisks(instance, self.cfg)
2251 raise errors.OpExecError("Could not start instance for full reboot")
2253 self.cfg.MarkInstanceUp(instance.name)
2256 class LUShutdownInstance(LogicalUnit):
2257 """Shutdown an instance.
2260 HPATH = "instance-stop"
2261 HTYPE = constants.HTYPE_INSTANCE
2262 _OP_REQP = ["instance_name"]
2264 def BuildHooksEnv(self):
2267 This runs on master, primary and secondary nodes of the instance.
2270 env = _BuildInstanceHookEnvByObject(self.instance)
2271 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2272 list(self.instance.secondary_nodes))
2275 def CheckPrereq(self):
2276 """Check prerequisites.
2278 This checks that the instance is in the cluster.
2281 instance = self.cfg.GetInstanceInfo(
2282 self.cfg.ExpandInstanceName(self.op.instance_name))
2283 if instance is None:
2284 raise errors.OpPrereqError("Instance '%s' not known" %
2285 self.op.instance_name)
2286 self.instance = instance
2288 def Exec(self, feedback_fn):
2289 """Shutdown the instance.
2292 instance = self.instance
2293 node_current = instance.primary_node
2294 self.cfg.MarkInstanceDown(instance.name)
2295 if not rpc.call_instance_shutdown(node_current, instance):
2296 logger.Error("could not shutdown instance")
2298 _ShutdownInstanceDisks(instance, self.cfg)
2301 class LUReinstallInstance(LogicalUnit):
2302 """Reinstall an instance.
2305 HPATH = "instance-reinstall"
2306 HTYPE = constants.HTYPE_INSTANCE
2307 _OP_REQP = ["instance_name"]
2309 def BuildHooksEnv(self):
2312 This runs on master, primary and secondary nodes of the instance.
2315 env = _BuildInstanceHookEnvByObject(self.instance)
2316 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2317 list(self.instance.secondary_nodes))
2320 def CheckPrereq(self):
2321 """Check prerequisites.
2323 This checks that the instance is in the cluster and is not running.
2326 instance = self.cfg.GetInstanceInfo(
2327 self.cfg.ExpandInstanceName(self.op.instance_name))
2328 if instance is None:
2329 raise errors.OpPrereqError("Instance '%s' not known" %
2330 self.op.instance_name)
2331 if instance.disk_template == constants.DT_DISKLESS:
2332 raise errors.OpPrereqError("Instance '%s' has no disks" %
2333 self.op.instance_name)
2334 if instance.status != "down":
2335 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2336 self.op.instance_name)
2337 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2339 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2340 (self.op.instance_name,
2341 instance.primary_node))
2343 self.op.os_type = getattr(self.op, "os_type", None)
2344 if self.op.os_type is not None:
2346 pnode = self.cfg.GetNodeInfo(
2347 self.cfg.ExpandNodeName(instance.primary_node))
2349 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2351 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2353 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2354 " primary node" % self.op.os_type)
2356 self.instance = instance
2358 def Exec(self, feedback_fn):
2359 """Reinstall the instance.
2362 inst = self.instance
2364 if self.op.os_type is not None:
2365 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2366 inst.os = self.op.os_type
2367 self.cfg.AddInstance(inst)
2369 _StartInstanceDisks(self.cfg, inst, None)
2371 feedback_fn("Running the instance OS create scripts...")
2372 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2373 raise errors.OpExecError("Could not install OS for instance %s"
2375 (inst.name, inst.primary_node))
2377 _ShutdownInstanceDisks(inst, self.cfg)
2380 class LURenameInstance(LogicalUnit):
2381 """Rename an instance.
2384 HPATH = "instance-rename"
2385 HTYPE = constants.HTYPE_INSTANCE
2386 _OP_REQP = ["instance_name", "new_name"]
2388 def BuildHooksEnv(self):
2391 This runs on master, primary and secondary nodes of the instance.
2394 env = _BuildInstanceHookEnvByObject(self.instance)
2395 env["INSTANCE_NEW_NAME"] = self.op.new_name
2396 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2397 list(self.instance.secondary_nodes))
2400 def CheckPrereq(self):
2401 """Check prerequisites.
2403 This checks that the instance is in the cluster and is not running.
2406 instance = self.cfg.GetInstanceInfo(
2407 self.cfg.ExpandInstanceName(self.op.instance_name))
2408 if instance is None:
2409 raise errors.OpPrereqError("Instance '%s' not known" %
2410 self.op.instance_name)
2411 if instance.status != "down":
2412 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2413 self.op.instance_name)
2414 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2416 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2417 (self.op.instance_name,
2418 instance.primary_node))
2419 self.instance = instance
2421 # new name verification
2422 name_info = utils.HostInfo(self.op.new_name)
2424 self.op.new_name = new_name = name_info.name
2425 instance_list = self.cfg.GetInstanceList()
2426 if new_name in instance_list:
2427 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2430 if not getattr(self.op, "ignore_ip", False):
2431 command = ["fping", "-q", name_info.ip]
2432 result = utils.RunCmd(command)
2433 if not result.failed:
2434 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2435 (name_info.ip, new_name))
2438 def Exec(self, feedback_fn):
2439 """Reinstall the instance.
2442 inst = self.instance
2443 old_name = inst.name
2445 self.cfg.RenameInstance(inst.name, self.op.new_name)
2447 # re-read the instance from the configuration after rename
2448 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2450 _StartInstanceDisks(self.cfg, inst, None)
2452 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2454 msg = ("Could run OS rename script for instance %s on node %s (but the"
2455 " instance has been renamed in Ganeti)" %
2456 (inst.name, inst.primary_node))
2459 _ShutdownInstanceDisks(inst, self.cfg)
2462 class LURemoveInstance(LogicalUnit):
2463 """Remove an instance.
2466 HPATH = "instance-remove"
2467 HTYPE = constants.HTYPE_INSTANCE
2468 _OP_REQP = ["instance_name"]
2470 def BuildHooksEnv(self):
2473 This runs on master, primary and secondary nodes of the instance.
2476 env = _BuildInstanceHookEnvByObject(self.instance)
2477 nl = [self.sstore.GetMasterNode()]
2480 def CheckPrereq(self):
2481 """Check prerequisites.
2483 This checks that the instance is in the cluster.
2486 instance = self.cfg.GetInstanceInfo(
2487 self.cfg.ExpandInstanceName(self.op.instance_name))
2488 if instance is None:
2489 raise errors.OpPrereqError("Instance '%s' not known" %
2490 self.op.instance_name)
2491 self.instance = instance
2493 def Exec(self, feedback_fn):
2494 """Remove the instance.
2497 instance = self.instance
2498 logger.Info("shutting down instance %s on node %s" %
2499 (instance.name, instance.primary_node))
2501 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2502 if self.op.ignore_failures:
2503 feedback_fn("Warning: can't shutdown instance")
2505 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2506 (instance.name, instance.primary_node))
2508 logger.Info("removing block devices for instance %s" % instance.name)
2510 if not _RemoveDisks(instance, self.cfg):
2511 if self.op.ignore_failures:
2512 feedback_fn("Warning: can't remove instance's disks")
2514 raise errors.OpExecError("Can't remove instance's disks")
2516 logger.Info("removing instance %s out of cluster config" % instance.name)
2518 self.cfg.RemoveInstance(instance.name)
2521 class LUQueryInstances(NoHooksLU):
2522 """Logical unit for querying instances.
2525 _OP_REQP = ["output_fields", "names"]
2527 def CheckPrereq(self):
2528 """Check prerequisites.
2530 This checks that the fields required are valid output fields.
2533 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2534 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2535 "admin_state", "admin_ram",
2536 "disk_template", "ip", "mac", "bridge",
2537 "sda_size", "sdb_size", "vcpus"],
2538 dynamic=self.dynamic_fields,
2539 selected=self.op.output_fields)
2541 self.wanted = _GetWantedInstances(self, self.op.names)
2543 def Exec(self, feedback_fn):
2544 """Computes the list of nodes and their attributes.
2547 instance_names = self.wanted
2548 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2551 # begin data gathering
2553 nodes = frozenset([inst.primary_node for inst in instance_list])
2556 if self.dynamic_fields.intersection(self.op.output_fields):
2558 node_data = rpc.call_all_instances_info(nodes)
2560 result = node_data[name]
2562 live_data.update(result)
2563 elif result == False:
2564 bad_nodes.append(name)
2565 # else no instance is alive
2567 live_data = dict([(name, {}) for name in instance_names])
2569 # end data gathering
2572 for instance in instance_list:
2574 for field in self.op.output_fields:
2579 elif field == "pnode":
2580 val = instance.primary_node
2581 elif field == "snodes":
2582 val = list(instance.secondary_nodes)
2583 elif field == "admin_state":
2584 val = (instance.status != "down")
2585 elif field == "oper_state":
2586 if instance.primary_node in bad_nodes:
2589 val = bool(live_data.get(instance.name))
2590 elif field == "status":
2591 if instance.primary_node in bad_nodes:
2592 val = "ERROR_nodedown"
2594 running = bool(live_data.get(instance.name))
2596 if instance.status != "down":
2601 if instance.status != "down":
2605 elif field == "admin_ram":
2606 val = instance.memory
2607 elif field == "oper_ram":
2608 if instance.primary_node in bad_nodes:
2610 elif instance.name in live_data:
2611 val = live_data[instance.name].get("memory", "?")
2614 elif field == "disk_template":
2615 val = instance.disk_template
2617 val = instance.nics[0].ip
2618 elif field == "bridge":
2619 val = instance.nics[0].bridge
2620 elif field == "mac":
2621 val = instance.nics[0].mac
2622 elif field == "sda_size" or field == "sdb_size":
2623 disk = instance.FindDisk(field[:3])
2628 elif field == "vcpus":
2629 val = instance.vcpus
2631 raise errors.ParameterError(field)
2638 class LUFailoverInstance(LogicalUnit):
2639 """Failover an instance.
2642 HPATH = "instance-failover"
2643 HTYPE = constants.HTYPE_INSTANCE
2644 _OP_REQP = ["instance_name", "ignore_consistency"]
2646 def BuildHooksEnv(self):
2649 This runs on master, primary and secondary nodes of the instance.
2653 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2655 env.update(_BuildInstanceHookEnvByObject(self.instance))
2656 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2659 def CheckPrereq(self):
2660 """Check prerequisites.
2662 This checks that the instance is in the cluster.
2665 instance = self.cfg.GetInstanceInfo(
2666 self.cfg.ExpandInstanceName(self.op.instance_name))
2667 if instance is None:
2668 raise errors.OpPrereqError("Instance '%s' not known" %
2669 self.op.instance_name)
2671 if instance.disk_template not in constants.DTS_NET_MIRROR:
2672 raise errors.OpPrereqError("Instance's disk layout is not"
2673 " network mirrored, cannot failover.")
2675 secondary_nodes = instance.secondary_nodes
2676 if not secondary_nodes:
2677 raise errors.ProgrammerError("no secondary node but using "
2678 "DT_REMOTE_RAID1 template")
2680 target_node = secondary_nodes[0]
2681 # check memory requirements on the secondary node
2682 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2683 instance.name, instance.memory)
2685 # check bridge existance
2686 brlist = [nic.bridge for nic in instance.nics]
2687 if not rpc.call_bridges_exist(target_node, brlist):
2688 raise errors.OpPrereqError("One or more target bridges %s does not"
2689 " exist on destination node '%s'" %
2690 (brlist, target_node))
2692 self.instance = instance
2694 def Exec(self, feedback_fn):
2695 """Failover an instance.
2697 The failover is done by shutting it down on its present node and
2698 starting it on the secondary.
2701 instance = self.instance
2703 source_node = instance.primary_node
2704 target_node = instance.secondary_nodes[0]
2706 feedback_fn("* checking disk consistency between source and target")
2707 for dev in instance.disks:
2708 # for remote_raid1, these are md over drbd
2709 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2710 if instance.status == "up" and not self.op.ignore_consistency:
2711 raise errors.OpExecError("Disk %s is degraded on target node,"
2712 " aborting failover." % dev.iv_name)
2714 feedback_fn("* shutting down instance on source node")
2715 logger.Info("Shutting down instance %s on node %s" %
2716 (instance.name, source_node))
2718 if not rpc.call_instance_shutdown(source_node, instance):
2719 if self.op.ignore_consistency:
2720 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2721 " anyway. Please make sure node %s is down" %
2722 (instance.name, source_node, source_node))
2724 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2725 (instance.name, source_node))
2727 feedback_fn("* deactivating the instance's disks on source node")
2728 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2729 raise errors.OpExecError("Can't shut down the instance's disks.")
2731 instance.primary_node = target_node
2732 # distribute new instance config to the other nodes
2733 self.cfg.AddInstance(instance)
2735 # Only start the instance if it's marked as up
2736 if instance.status == "up":
2737 feedback_fn("* activating the instance's disks on target node")
2738 logger.Info("Starting instance %s on node %s" %
2739 (instance.name, target_node))
2741 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2742 ignore_secondaries=True)
2744 _ShutdownInstanceDisks(instance, self.cfg)
2745 raise errors.OpExecError("Can't activate the instance's disks")
2747 feedback_fn("* starting the instance on the target node")
2748 if not rpc.call_instance_start(target_node, instance, None):
2749 _ShutdownInstanceDisks(instance, self.cfg)
2750 raise errors.OpExecError("Could not start instance %s on node %s." %
2751 (instance.name, target_node))
2754 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2755 """Create a tree of block devices on the primary node.
2757 This always creates all devices.
2761 for child in device.children:
2762 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2765 cfg.SetDiskID(device, node)
2766 new_id = rpc.call_blockdev_create(node, device, device.size,
2767 instance.name, True, info)
2770 if device.physical_id is None:
2771 device.physical_id = new_id
2775 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2776 """Create a tree of block devices on a secondary node.
2778 If this device type has to be created on secondaries, create it and
2781 If not, just recurse to children keeping the same 'force' value.
2784 if device.CreateOnSecondary():
2787 for child in device.children:
2788 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2789 child, force, info):
2794 cfg.SetDiskID(device, node)
2795 new_id = rpc.call_blockdev_create(node, device, device.size,
2796 instance.name, False, info)
2799 if device.physical_id is None:
2800 device.physical_id = new_id
2804 def _GenerateUniqueNames(cfg, exts):
2805 """Generate a suitable LV name.
2807 This will generate a logical volume name for the given instance.
2812 new_id = cfg.GenerateUniqueID()
2813 results.append("%s%s" % (new_id, val))
2817 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2818 """Generate a drbd device complete with its children.
2821 port = cfg.AllocatePort()
2822 vgname = cfg.GetVGName()
2823 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2824 logical_id=(vgname, names[0]))
2825 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2826 logical_id=(vgname, names[1]))
2827 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2828 logical_id = (primary, secondary, port),
2829 children = [dev_data, dev_meta])
2833 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2834 """Generate a drbd8 device complete with its children.
2837 port = cfg.AllocatePort()
2838 vgname = cfg.GetVGName()
2839 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2840 logical_id=(vgname, names[0]))
2841 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2842 logical_id=(vgname, names[1]))
2843 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2844 logical_id = (primary, secondary, port),
2845 children = [dev_data, dev_meta],
2849 def _GenerateDiskTemplate(cfg, template_name,
2850 instance_name, primary_node,
2851 secondary_nodes, disk_sz, swap_sz):
2852 """Generate the entire disk layout for a given template type.
2855 #TODO: compute space requirements
2857 vgname = cfg.GetVGName()
2858 if template_name == constants.DT_DISKLESS:
2860 elif template_name == constants.DT_PLAIN:
2861 if len(secondary_nodes) != 0:
2862 raise errors.ProgrammerError("Wrong template configuration")
2864 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2865 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2866 logical_id=(vgname, names[0]),
2868 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2869 logical_id=(vgname, names[1]),
2871 disks = [sda_dev, sdb_dev]
2872 elif template_name == constants.DT_LOCAL_RAID1:
2873 if len(secondary_nodes) != 0:
2874 raise errors.ProgrammerError("Wrong template configuration")
2877 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2878 ".sdb_m1", ".sdb_m2"])
2879 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2880 logical_id=(vgname, names[0]))
2881 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2882 logical_id=(vgname, names[1]))
2883 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2885 children = [sda_dev_m1, sda_dev_m2])
2886 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2887 logical_id=(vgname, names[2]))
2888 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2889 logical_id=(vgname, names[3]))
2890 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2892 children = [sdb_dev_m1, sdb_dev_m2])
2893 disks = [md_sda_dev, md_sdb_dev]
2894 elif template_name == constants.DT_REMOTE_RAID1:
2895 if len(secondary_nodes) != 1:
2896 raise errors.ProgrammerError("Wrong template configuration")
2897 remote_node = secondary_nodes[0]
2898 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2899 ".sdb_data", ".sdb_meta"])
2900 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2901 disk_sz, names[0:2])
2902 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2903 children = [drbd_sda_dev], size=disk_sz)
2904 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2905 swap_sz, names[2:4])
2906 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2907 children = [drbd_sdb_dev], size=swap_sz)
2908 disks = [md_sda_dev, md_sdb_dev]
2909 elif template_name == constants.DT_DRBD8:
2910 if len(secondary_nodes) != 1:
2911 raise errors.ProgrammerError("Wrong template configuration")
2912 remote_node = secondary_nodes[0]
2913 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2914 ".sdb_data", ".sdb_meta"])
2915 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2916 disk_sz, names[0:2], "sda")
2917 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2918 swap_sz, names[2:4], "sdb")
2919 disks = [drbd_sda_dev, drbd_sdb_dev]
2921 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2925 def _GetInstanceInfoText(instance):
2926 """Compute that text that should be added to the disk's metadata.
2929 return "originstname+%s" % instance.name
2932 def _CreateDisks(cfg, instance):
2933 """Create all disks for an instance.
2935 This abstracts away some work from AddInstance.
2938 instance: the instance object
2941 True or False showing the success of the creation process
2944 info = _GetInstanceInfoText(instance)
2946 for device in instance.disks:
2947 logger.Info("creating volume %s for instance %s" %
2948 (device.iv_name, instance.name))
2950 for secondary_node in instance.secondary_nodes:
2951 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2952 device, False, info):
2953 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2954 (device.iv_name, device, secondary_node))
2957 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2958 instance, device, info):
2959 logger.Error("failed to create volume %s on primary!" %
2965 def _RemoveDisks(instance, cfg):
2966 """Remove all disks for an instance.
2968 This abstracts away some work from `AddInstance()` and
2969 `RemoveInstance()`. Note that in case some of the devices couldn't
2970 be removed, the removal will continue with the other ones (compare
2971 with `_CreateDisks()`).
2974 instance: the instance object
2977 True or False showing the success of the removal proces
2980 logger.Info("removing block devices for instance %s" % instance.name)
2983 for device in instance.disks:
2984 for node, disk in device.ComputeNodeTree(instance.primary_node):
2985 cfg.SetDiskID(disk, node)
2986 if not rpc.call_blockdev_remove(node, disk):
2987 logger.Error("could not remove block device %s on node %s,"
2988 " continuing anyway" %
2989 (device.iv_name, node))
2994 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2995 """Compute disk size requirements in the volume group
2997 This is currently hard-coded for the two-drive layout.
3000 # Required free disk space as a function of disk and swap space
3002 constants.DT_DISKLESS: None,
3003 constants.DT_PLAIN: disk_size + swap_size,
3004 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3005 # 256 MB are added for drbd metadata, 128MB for each drbd device
3006 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3007 constants.DT_DRBD8: disk_size + swap_size + 256,
3010 if disk_template not in req_size_dict:
3011 raise errors.ProgrammerError("Disk template '%s' size requirement"
3012 " is unknown" % disk_template)
3014 return req_size_dict[disk_template]
3017 class LUCreateInstance(LogicalUnit):
3018 """Create an instance.
3021 HPATH = "instance-add"
3022 HTYPE = constants.HTYPE_INSTANCE
3023 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3024 "disk_template", "swap_size", "mode", "start", "vcpus",
3025 "wait_for_sync", "ip_check", "mac"]
3027 def _RunAllocator(self):
3028 """Run the allocator based on input opcode.
3031 disks = [{"size": self.op.disk_size, "mode": "w"},
3032 {"size": self.op.swap_size, "mode": "w"}]
3033 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3034 "bridge": self.op.bridge}]
3035 ial = IAllocator(self.cfg, self.sstore,
3036 name=self.op.instance_name,
3037 disk_template=self.op.disk_template,
3040 vcpus=self.op.vcpus,
3041 mem_size=self.op.mem_size,
3044 mode=constants.IALLOCATOR_MODE_ALLOC)
3046 ial.Run(self.op.iallocator)
3049 raise errors.OpPrereqError("Can't compute nodes using"
3050 " iallocator '%s': %s" % (self.op.iallocator,
3053 if self.op.disk_template in constants.DTS_NET_MIRROR:
3056 if len(ial.nodes) != req_nodes:
3057 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3058 " of nodes (%s), required %s" %
3059 (len(ial.nodes), req_nodes))
3060 self.op.pnode = ial.nodes[0]
3061 logger.ToStdout("Selected nodes for the instance: %s" %
3062 (", ".join(ial.nodes),))
3063 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3064 (self.op.instance_name, self.op.iallocator, ial.nodes))
3066 self.op.snode = ial.nodes[1]
3068 def BuildHooksEnv(self):
3071 This runs on master, primary and secondary nodes of the instance.
3075 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3076 "INSTANCE_DISK_SIZE": self.op.disk_size,
3077 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3078 "INSTANCE_ADD_MODE": self.op.mode,
3080 if self.op.mode == constants.INSTANCE_IMPORT:
3081 env["INSTANCE_SRC_NODE"] = self.op.src_node
3082 env["INSTANCE_SRC_PATH"] = self.op.src_path
3083 env["INSTANCE_SRC_IMAGE"] = self.src_image
3085 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3086 primary_node=self.op.pnode,
3087 secondary_nodes=self.secondaries,
3088 status=self.instance_status,
3089 os_type=self.op.os_type,
3090 memory=self.op.mem_size,
3091 vcpus=self.op.vcpus,
3092 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3095 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3100 def CheckPrereq(self):
3101 """Check prerequisites.
3104 # set optional parameters to none if they don't exist
3105 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3107 if not hasattr(self.op, attr):
3108 setattr(self.op, attr, None)
3110 if self.op.mode not in (constants.INSTANCE_CREATE,
3111 constants.INSTANCE_IMPORT):
3112 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3115 if self.op.mode == constants.INSTANCE_IMPORT:
3116 src_node = getattr(self.op, "src_node", None)
3117 src_path = getattr(self.op, "src_path", None)
3118 if src_node is None or src_path is None:
3119 raise errors.OpPrereqError("Importing an instance requires source"
3120 " node and path options")
3121 src_node_full = self.cfg.ExpandNodeName(src_node)
3122 if src_node_full is None:
3123 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3124 self.op.src_node = src_node = src_node_full
3126 if not os.path.isabs(src_path):
3127 raise errors.OpPrereqError("The source path must be absolute")
3129 export_info = rpc.call_export_info(src_node, src_path)
3132 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3134 if not export_info.has_section(constants.INISECT_EXP):
3135 raise errors.ProgrammerError("Corrupted export config")
3137 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3138 if (int(ei_version) != constants.EXPORT_VERSION):
3139 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3140 (ei_version, constants.EXPORT_VERSION))
3142 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3143 raise errors.OpPrereqError("Can't import instance with more than"
3146 # FIXME: are the old os-es, disk sizes, etc. useful?
3147 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3148 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3150 self.src_image = diskimage
3151 else: # INSTANCE_CREATE
3152 if getattr(self.op, "os_type", None) is None:
3153 raise errors.OpPrereqError("No guest OS specified")
3155 #### instance parameters check
3157 # disk template and mirror node verification
3158 if self.op.disk_template not in constants.DISK_TEMPLATES:
3159 raise errors.OpPrereqError("Invalid disk template name")
3161 # instance name verification
3162 hostname1 = utils.HostInfo(self.op.instance_name)
3164 self.op.instance_name = instance_name = hostname1.name
3165 instance_list = self.cfg.GetInstanceList()
3166 if instance_name in instance_list:
3167 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3170 # ip validity checks
3171 ip = getattr(self.op, "ip", None)
3172 if ip is None or ip.lower() == "none":
3174 elif ip.lower() == "auto":
3175 inst_ip = hostname1.ip
3177 if not utils.IsValidIP(ip):
3178 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3179 " like a valid IP" % ip)
3181 self.inst_ip = self.op.ip = inst_ip
3183 if self.op.start and not self.op.ip_check:
3184 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3185 " adding an instance in start mode")
3187 if self.op.ip_check:
3188 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3189 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3190 (hostname1.ip, instance_name))
3192 # MAC address verification
3193 if self.op.mac != "auto":
3194 if not utils.IsValidMac(self.op.mac.lower()):
3195 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3198 # bridge verification
3199 bridge = getattr(self.op, "bridge", None)
3201 self.op.bridge = self.cfg.GetDefBridge()
3203 self.op.bridge = bridge
3205 # boot order verification
3206 if self.op.hvm_boot_order is not None:
3207 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3208 raise errors.OpPrereqError("invalid boot order specified,"
3209 " must be one or more of [acdn]")
3212 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3213 raise errors.OpPrereqError("One and only one of iallocator and primary"
3214 " node must be given")
3216 if self.op.iallocator is not None:
3217 self._RunAllocator()
3219 #### node related checks
3221 # check primary node
3222 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3224 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3226 self.op.pnode = pnode.name
3228 self.secondaries = []
3230 # mirror node verification
3231 if self.op.disk_template in constants.DTS_NET_MIRROR:
3232 if getattr(self.op, "snode", None) is None:
3233 raise errors.OpPrereqError("The networked disk templates need"
3236 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3237 if snode_name is None:
3238 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3240 elif snode_name == pnode.name:
3241 raise errors.OpPrereqError("The secondary node cannot be"
3242 " the primary node.")
3243 self.secondaries.append(snode_name)
3245 req_size = _ComputeDiskSize(self.op.disk_template,
3246 self.op.disk_size, self.op.swap_size)
3248 # Check lv size requirements
3249 if req_size is not None:
3250 nodenames = [pnode.name] + self.secondaries
3251 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3252 for node in nodenames:
3253 info = nodeinfo.get(node, None)
3255 raise errors.OpPrereqError("Cannot get current information"
3256 " from node '%s'" % nodeinfo)
3257 vg_free = info.get('vg_free', None)
3258 if not isinstance(vg_free, int):
3259 raise errors.OpPrereqError("Can't compute free disk space on"
3261 if req_size > info['vg_free']:
3262 raise errors.OpPrereqError("Not enough disk space on target node %s."
3263 " %d MB available, %d MB required" %
3264 (node, info['vg_free'], req_size))
3267 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3269 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3270 " primary node" % self.op.os_type)
3272 if self.op.kernel_path == constants.VALUE_NONE:
3273 raise errors.OpPrereqError("Can't set instance kernel to none")
3276 # bridge check on primary node
3277 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3278 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3279 " destination node '%s'" %
3280 (self.op.bridge, pnode.name))
3283 self.instance_status = 'up'
3285 self.instance_status = 'down'
3287 def Exec(self, feedback_fn):
3288 """Create and add the instance to the cluster.
3291 instance = self.op.instance_name
3292 pnode_name = self.pnode.name
3294 if self.op.mac == "auto":
3295 mac_address = self.cfg.GenerateMAC()
3297 mac_address = self.op.mac
3299 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3300 if self.inst_ip is not None:
3301 nic.ip = self.inst_ip
3303 ht_kind = self.sstore.GetHypervisorType()
3304 if ht_kind in constants.HTS_REQ_PORT:
3305 network_port = self.cfg.AllocatePort()
3309 disks = _GenerateDiskTemplate(self.cfg,
3310 self.op.disk_template,
3311 instance, pnode_name,
3312 self.secondaries, self.op.disk_size,
3315 iobj = objects.Instance(name=instance, os=self.op.os_type,
3316 primary_node=pnode_name,
3317 memory=self.op.mem_size,
3318 vcpus=self.op.vcpus,
3319 nics=[nic], disks=disks,
3320 disk_template=self.op.disk_template,
3321 status=self.instance_status,
3322 network_port=network_port,
3323 kernel_path=self.op.kernel_path,
3324 initrd_path=self.op.initrd_path,
3325 hvm_boot_order=self.op.hvm_boot_order,
3328 feedback_fn("* creating instance disks...")
3329 if not _CreateDisks(self.cfg, iobj):
3330 _RemoveDisks(iobj, self.cfg)
3331 raise errors.OpExecError("Device creation failed, reverting...")
3333 feedback_fn("adding instance %s to cluster config" % instance)
3335 self.cfg.AddInstance(iobj)
3337 if self.op.wait_for_sync:
3338 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3339 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3340 # make sure the disks are not degraded (still sync-ing is ok)
3342 feedback_fn("* checking mirrors status")
3343 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3348 _RemoveDisks(iobj, self.cfg)
3349 self.cfg.RemoveInstance(iobj.name)
3350 raise errors.OpExecError("There are some degraded disks for"
3353 feedback_fn("creating os for instance %s on node %s" %
3354 (instance, pnode_name))
3356 if iobj.disk_template != constants.DT_DISKLESS:
3357 if self.op.mode == constants.INSTANCE_CREATE:
3358 feedback_fn("* running the instance OS create scripts...")
3359 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3360 raise errors.OpExecError("could not add os for instance %s"
3362 (instance, pnode_name))
3364 elif self.op.mode == constants.INSTANCE_IMPORT:
3365 feedback_fn("* running the instance OS import scripts...")
3366 src_node = self.op.src_node
3367 src_image = self.src_image
3368 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3369 src_node, src_image):
3370 raise errors.OpExecError("Could not import os for instance"
3372 (instance, pnode_name))
3374 # also checked in the prereq part
3375 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3379 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3380 feedback_fn("* starting instance...")
3381 if not rpc.call_instance_start(pnode_name, iobj, None):
3382 raise errors.OpExecError("Could not start instance")
3385 class LUConnectConsole(NoHooksLU):
3386 """Connect to an instance's console.
3388 This is somewhat special in that it returns the command line that
3389 you need to run on the master node in order to connect to the
3393 _OP_REQP = ["instance_name"]
3395 def CheckPrereq(self):
3396 """Check prerequisites.
3398 This checks that the instance is in the cluster.
3401 instance = self.cfg.GetInstanceInfo(
3402 self.cfg.ExpandInstanceName(self.op.instance_name))
3403 if instance is None:
3404 raise errors.OpPrereqError("Instance '%s' not known" %
3405 self.op.instance_name)
3406 self.instance = instance
3408 def Exec(self, feedback_fn):
3409 """Connect to the console of an instance
3412 instance = self.instance
3413 node = instance.primary_node
3415 node_insts = rpc.call_instance_list([node])[node]
3416 if node_insts is False:
3417 raise errors.OpExecError("Can't connect to node %s." % node)
3419 if instance.name not in node_insts:
3420 raise errors.OpExecError("Instance %s is not running." % instance.name)
3422 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3424 hyper = hypervisor.GetHypervisor()
3425 console_cmd = hyper.GetShellCommandForConsole(instance)
3427 argv = ["ssh", "-q", "-t"]
3428 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3429 argv.extend(ssh.BATCH_MODE_OPTS)
3431 argv.append(console_cmd)
3435 class LUAddMDDRBDComponent(LogicalUnit):
3436 """Adda new mirror member to an instance's disk.
3439 HPATH = "mirror-add"
3440 HTYPE = constants.HTYPE_INSTANCE
3441 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3443 def BuildHooksEnv(self):
3446 This runs on the master, the primary and all the secondaries.
3450 "NEW_SECONDARY": self.op.remote_node,
3451 "DISK_NAME": self.op.disk_name,
3453 env.update(_BuildInstanceHookEnvByObject(self.instance))
3454 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3455 self.op.remote_node,] + list(self.instance.secondary_nodes)
3458 def CheckPrereq(self):
3459 """Check prerequisites.
3461 This checks that the instance is in the cluster.
3464 instance = self.cfg.GetInstanceInfo(
3465 self.cfg.ExpandInstanceName(self.op.instance_name))
3466 if instance is None:
3467 raise errors.OpPrereqError("Instance '%s' not known" %
3468 self.op.instance_name)
3469 self.instance = instance
3471 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3472 if remote_node is None:
3473 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3474 self.remote_node = remote_node
3476 if remote_node == instance.primary_node:
3477 raise errors.OpPrereqError("The specified node is the primary node of"
3480 if instance.disk_template != constants.DT_REMOTE_RAID1:
3481 raise errors.OpPrereqError("Instance's disk layout is not"
3483 for disk in instance.disks:
3484 if disk.iv_name == self.op.disk_name:
3487 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3488 " instance." % self.op.disk_name)
3489 if len(disk.children) > 1:
3490 raise errors.OpPrereqError("The device already has two slave devices."
3491 " This would create a 3-disk raid1 which we"
3495 def Exec(self, feedback_fn):
3496 """Add the mirror component
3500 instance = self.instance
3502 remote_node = self.remote_node
3503 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3504 names = _GenerateUniqueNames(self.cfg, lv_names)
3505 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3506 remote_node, disk.size, names)
3508 logger.Info("adding new mirror component on secondary")
3510 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3512 _GetInstanceInfoText(instance)):
3513 raise errors.OpExecError("Failed to create new component on secondary"
3514 " node %s" % remote_node)
3516 logger.Info("adding new mirror component on primary")
3518 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3520 _GetInstanceInfoText(instance)):
3521 # remove secondary dev
3522 self.cfg.SetDiskID(new_drbd, remote_node)
3523 rpc.call_blockdev_remove(remote_node, new_drbd)
3524 raise errors.OpExecError("Failed to create volume on primary")
3526 # the device exists now
3527 # call the primary node to add the mirror to md
3528 logger.Info("adding new mirror component to md")
3529 if not rpc.call_blockdev_addchildren(instance.primary_node,
3531 logger.Error("Can't add mirror compoment to md!")
3532 self.cfg.SetDiskID(new_drbd, remote_node)
3533 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3534 logger.Error("Can't rollback on secondary")
3535 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3536 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3537 logger.Error("Can't rollback on primary")
3538 raise errors.OpExecError("Can't add mirror component to md array")
3540 disk.children.append(new_drbd)
3542 self.cfg.AddInstance(instance)
3544 _WaitForSync(self.cfg, instance, self.proc)
3549 class LURemoveMDDRBDComponent(LogicalUnit):
3550 """Remove a component from a remote_raid1 disk.
3553 HPATH = "mirror-remove"
3554 HTYPE = constants.HTYPE_INSTANCE
3555 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3557 def BuildHooksEnv(self):
3560 This runs on the master, the primary and all the secondaries.
3564 "DISK_NAME": self.op.disk_name,
3565 "DISK_ID": self.op.disk_id,
3566 "OLD_SECONDARY": self.old_secondary,
3568 env.update(_BuildInstanceHookEnvByObject(self.instance))
3569 nl = [self.sstore.GetMasterNode(),
3570 self.instance.primary_node] + list(self.instance.secondary_nodes)
3573 def CheckPrereq(self):
3574 """Check prerequisites.
3576 This checks that the instance is in the cluster.
3579 instance = self.cfg.GetInstanceInfo(
3580 self.cfg.ExpandInstanceName(self.op.instance_name))
3581 if instance is None:
3582 raise errors.OpPrereqError("Instance '%s' not known" %
3583 self.op.instance_name)
3584 self.instance = instance
3586 if instance.disk_template != constants.DT_REMOTE_RAID1:
3587 raise errors.OpPrereqError("Instance's disk layout is not"
3589 for disk in instance.disks:
3590 if disk.iv_name == self.op.disk_name:
3593 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3594 " instance." % self.op.disk_name)
3595 for child in disk.children:
3596 if (child.dev_type == constants.LD_DRBD7 and
3597 child.logical_id[2] == self.op.disk_id):
3600 raise errors.OpPrereqError("Can't find the device with this port.")
3602 if len(disk.children) < 2:
3603 raise errors.OpPrereqError("Cannot remove the last component from"
3607 if self.child.logical_id[0] == instance.primary_node:
3611 self.old_secondary = self.child.logical_id[oid]
3613 def Exec(self, feedback_fn):
3614 """Remove the mirror component
3617 instance = self.instance
3620 logger.Info("remove mirror component")
3621 self.cfg.SetDiskID(disk, instance.primary_node)
3622 if not rpc.call_blockdev_removechildren(instance.primary_node,
3624 raise errors.OpExecError("Can't remove child from mirror.")
3626 for node in child.logical_id[:2]:
3627 self.cfg.SetDiskID(child, node)
3628 if not rpc.call_blockdev_remove(node, child):
3629 logger.Error("Warning: failed to remove device from node %s,"
3630 " continuing operation." % node)
3632 disk.children.remove(child)
3633 self.cfg.AddInstance(instance)
3636 class LUReplaceDisks(LogicalUnit):
3637 """Replace the disks of an instance.
3640 HPATH = "mirrors-replace"
3641 HTYPE = constants.HTYPE_INSTANCE
3642 _OP_REQP = ["instance_name", "mode", "disks"]
3644 def BuildHooksEnv(self):
3647 This runs on the master, the primary and all the secondaries.
3651 "MODE": self.op.mode,
3652 "NEW_SECONDARY": self.op.remote_node,
3653 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3655 env.update(_BuildInstanceHookEnvByObject(self.instance))
3657 self.sstore.GetMasterNode(),
3658 self.instance.primary_node,
3660 if self.op.remote_node is not None:
3661 nl.append(self.op.remote_node)
3664 def CheckPrereq(self):
3665 """Check prerequisites.
3667 This checks that the instance is in the cluster.
3670 instance = self.cfg.GetInstanceInfo(
3671 self.cfg.ExpandInstanceName(self.op.instance_name))
3672 if instance is None:
3673 raise errors.OpPrereqError("Instance '%s' not known" %
3674 self.op.instance_name)
3675 self.instance = instance
3676 self.op.instance_name = instance.name
3678 if instance.disk_template not in constants.DTS_NET_MIRROR:
3679 raise errors.OpPrereqError("Instance's disk layout is not"
3680 " network mirrored.")
3682 if len(instance.secondary_nodes) != 1:
3683 raise errors.OpPrereqError("The instance has a strange layout,"
3684 " expected one secondary but found %d" %
3685 len(instance.secondary_nodes))
3687 self.sec_node = instance.secondary_nodes[0]
3689 remote_node = getattr(self.op, "remote_node", None)
3690 if remote_node is not None:
3691 remote_node = self.cfg.ExpandNodeName(remote_node)
3692 if remote_node is None:
3693 raise errors.OpPrereqError("Node '%s' not known" %
3694 self.op.remote_node)
3695 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3697 self.remote_node_info = None
3698 if remote_node == instance.primary_node:
3699 raise errors.OpPrereqError("The specified node is the primary node of"
3701 elif remote_node == self.sec_node:
3702 if self.op.mode == constants.REPLACE_DISK_SEC:
3703 # this is for DRBD8, where we can't execute the same mode of
3704 # replacement as for drbd7 (no different port allocated)
3705 raise errors.OpPrereqError("Same secondary given, cannot execute"
3707 # the user gave the current secondary, switch to
3708 # 'no-replace-secondary' mode for drbd7
3710 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3711 self.op.mode != constants.REPLACE_DISK_ALL):
3712 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3713 " disks replacement, not individual ones")
3714 if instance.disk_template == constants.DT_DRBD8:
3715 if (self.op.mode == constants.REPLACE_DISK_ALL and
3716 remote_node is not None):
3717 # switch to replace secondary mode
3718 self.op.mode = constants.REPLACE_DISK_SEC
3720 if self.op.mode == constants.REPLACE_DISK_ALL:
3721 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3722 " secondary disk replacement, not"
3724 elif self.op.mode == constants.REPLACE_DISK_PRI:
3725 if remote_node is not None:
3726 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3727 " the secondary while doing a primary"
3728 " node disk replacement")
3729 self.tgt_node = instance.primary_node
3730 self.oth_node = instance.secondary_nodes[0]
3731 elif self.op.mode == constants.REPLACE_DISK_SEC:
3732 self.new_node = remote_node # this can be None, in which case
3733 # we don't change the secondary
3734 self.tgt_node = instance.secondary_nodes[0]
3735 self.oth_node = instance.primary_node
3737 raise errors.ProgrammerError("Unhandled disk replace mode")
3739 for name in self.op.disks:
3740 if instance.FindDisk(name) is None:
3741 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3742 (name, instance.name))
3743 self.op.remote_node = remote_node
3745 def _ExecRR1(self, feedback_fn):
3746 """Replace the disks of an instance.
3749 instance = self.instance
3752 if self.op.remote_node is None:
3753 remote_node = self.sec_node
3755 remote_node = self.op.remote_node
3757 for dev in instance.disks:
3759 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3760 names = _GenerateUniqueNames(cfg, lv_names)
3761 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3762 remote_node, size, names)
3763 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3764 logger.Info("adding new mirror component on secondary for %s" %
3767 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3769 _GetInstanceInfoText(instance)):
3770 raise errors.OpExecError("Failed to create new component on secondary"
3771 " node %s. Full abort, cleanup manually!" %
3774 logger.Info("adding new mirror component on primary")
3776 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3778 _GetInstanceInfoText(instance)):
3779 # remove secondary dev
3780 cfg.SetDiskID(new_drbd, remote_node)
3781 rpc.call_blockdev_remove(remote_node, new_drbd)
3782 raise errors.OpExecError("Failed to create volume on primary!"
3783 " Full abort, cleanup manually!!")
3785 # the device exists now
3786 # call the primary node to add the mirror to md
3787 logger.Info("adding new mirror component to md")
3788 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3790 logger.Error("Can't add mirror compoment to md!")
3791 cfg.SetDiskID(new_drbd, remote_node)
3792 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3793 logger.Error("Can't rollback on secondary")
3794 cfg.SetDiskID(new_drbd, instance.primary_node)
3795 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3796 logger.Error("Can't rollback on primary")
3797 raise errors.OpExecError("Full abort, cleanup manually!!")
3799 dev.children.append(new_drbd)
3800 cfg.AddInstance(instance)
3802 # this can fail as the old devices are degraded and _WaitForSync
3803 # does a combined result over all disks, so we don't check its
3805 _WaitForSync(cfg, instance, self.proc, unlock=True)
3807 # so check manually all the devices
3808 for name in iv_names:
3809 dev, child, new_drbd = iv_names[name]
3810 cfg.SetDiskID(dev, instance.primary_node)
3811 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3813 raise errors.OpExecError("MD device %s is degraded!" % name)
3814 cfg.SetDiskID(new_drbd, instance.primary_node)
3815 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3817 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3819 for name in iv_names:
3820 dev, child, new_drbd = iv_names[name]
3821 logger.Info("remove mirror %s component" % name)
3822 cfg.SetDiskID(dev, instance.primary_node)
3823 if not rpc.call_blockdev_removechildren(instance.primary_node,
3825 logger.Error("Can't remove child from mirror, aborting"
3826 " *this device cleanup*.\nYou need to cleanup manually!!")
3829 for node in child.logical_id[:2]:
3830 logger.Info("remove child device on %s" % node)
3831 cfg.SetDiskID(child, node)
3832 if not rpc.call_blockdev_remove(node, child):
3833 logger.Error("Warning: failed to remove device from node %s,"
3834 " continuing operation." % node)
3836 dev.children.remove(child)
3838 cfg.AddInstance(instance)
3840 def _ExecD8DiskOnly(self, feedback_fn):
3841 """Replace a disk on the primary or secondary for dbrd8.
3843 The algorithm for replace is quite complicated:
3844 - for each disk to be replaced:
3845 - create new LVs on the target node with unique names
3846 - detach old LVs from the drbd device
3847 - rename old LVs to name_replaced.<time_t>
3848 - rename new LVs to old LVs
3849 - attach the new LVs (with the old names now) to the drbd device
3850 - wait for sync across all devices
3851 - for each modified disk:
3852 - remove old LVs (which have the name name_replaces.<time_t>)
3854 Failures are not very well handled.
3858 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3859 instance = self.instance
3861 vgname = self.cfg.GetVGName()
3864 tgt_node = self.tgt_node
3865 oth_node = self.oth_node
3867 # Step: check device activation
3868 self.proc.LogStep(1, steps_total, "check device existence")
3869 info("checking volume groups")
3870 my_vg = cfg.GetVGName()
3871 results = rpc.call_vg_list([oth_node, tgt_node])
3873 raise errors.OpExecError("Can't list volume groups on the nodes")
3874 for node in oth_node, tgt_node:
3875 res = results.get(node, False)
3876 if not res or my_vg not in res:
3877 raise errors.OpExecError("Volume group '%s' not found on %s" %
3879 for dev in instance.disks:
3880 if not dev.iv_name in self.op.disks:
3882 for node in tgt_node, oth_node:
3883 info("checking %s on %s" % (dev.iv_name, node))
3884 cfg.SetDiskID(dev, node)
3885 if not rpc.call_blockdev_find(node, dev):
3886 raise errors.OpExecError("Can't find device %s on node %s" %
3887 (dev.iv_name, node))
3889 # Step: check other node consistency
3890 self.proc.LogStep(2, steps_total, "check peer consistency")
3891 for dev in instance.disks:
3892 if not dev.iv_name in self.op.disks:
3894 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3895 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3896 oth_node==instance.primary_node):
3897 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3898 " to replace disks on this node (%s)" %
3899 (oth_node, tgt_node))
3901 # Step: create new storage
3902 self.proc.LogStep(3, steps_total, "allocate new storage")
3903 for dev in instance.disks:
3904 if not dev.iv_name in self.op.disks:
3907 cfg.SetDiskID(dev, tgt_node)
3908 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3909 names = _GenerateUniqueNames(cfg, lv_names)
3910 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3911 logical_id=(vgname, names[0]))
3912 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3913 logical_id=(vgname, names[1]))
3914 new_lvs = [lv_data, lv_meta]
3915 old_lvs = dev.children
3916 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3917 info("creating new local storage on %s for %s" %
3918 (tgt_node, dev.iv_name))
3919 # since we *always* want to create this LV, we use the
3920 # _Create...OnPrimary (which forces the creation), even if we
3921 # are talking about the secondary node
3922 for new_lv in new_lvs:
3923 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3924 _GetInstanceInfoText(instance)):
3925 raise errors.OpExecError("Failed to create new LV named '%s' on"
3927 (new_lv.logical_id[1], tgt_node))
3929 # Step: for each lv, detach+rename*2+attach
3930 self.proc.LogStep(4, steps_total, "change drbd configuration")
3931 for dev, old_lvs, new_lvs in iv_names.itervalues():
3932 info("detaching %s drbd from local storage" % dev.iv_name)
3933 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3934 raise errors.OpExecError("Can't detach drbd from local storage on node"
3935 " %s for device %s" % (tgt_node, dev.iv_name))
3937 #cfg.Update(instance)
3939 # ok, we created the new LVs, so now we know we have the needed
3940 # storage; as such, we proceed on the target node to rename
3941 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3942 # using the assumption that logical_id == physical_id (which in
3943 # turn is the unique_id on that node)
3945 # FIXME(iustin): use a better name for the replaced LVs
3946 temp_suffix = int(time.time())
3947 ren_fn = lambda d, suff: (d.physical_id[0],
3948 d.physical_id[1] + "_replaced-%s" % suff)
3949 # build the rename list based on what LVs exist on the node
3951 for to_ren in old_lvs:
3952 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3953 if find_res is not None: # device exists
3954 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3956 info("renaming the old LVs on the target node")
3957 if not rpc.call_blockdev_rename(tgt_node, rlist):
3958 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3959 # now we rename the new LVs to the old LVs
3960 info("renaming the new LVs on the target node")
3961 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3962 if not rpc.call_blockdev_rename(tgt_node, rlist):
3963 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3965 for old, new in zip(old_lvs, new_lvs):
3966 new.logical_id = old.logical_id
3967 cfg.SetDiskID(new, tgt_node)
3969 for disk in old_lvs:
3970 disk.logical_id = ren_fn(disk, temp_suffix)
3971 cfg.SetDiskID(disk, tgt_node)
3973 # now that the new lvs have the old name, we can add them to the device
3974 info("adding new mirror component on %s" % tgt_node)
3975 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3976 for new_lv in new_lvs:
3977 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3978 warning("Can't rollback device %s", hint="manually cleanup unused"
3980 raise errors.OpExecError("Can't add local storage to drbd")
3982 dev.children = new_lvs
3983 cfg.Update(instance)
3985 # Step: wait for sync
3987 # this can fail as the old devices are degraded and _WaitForSync
3988 # does a combined result over all disks, so we don't check its
3990 self.proc.LogStep(5, steps_total, "sync devices")
3991 _WaitForSync(cfg, instance, self.proc, unlock=True)
3993 # so check manually all the devices
3994 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3995 cfg.SetDiskID(dev, instance.primary_node)
3996 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3998 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4000 # Step: remove old storage
4001 self.proc.LogStep(6, steps_total, "removing old storage")
4002 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4003 info("remove logical volumes for %s" % name)
4005 cfg.SetDiskID(lv, tgt_node)
4006 if not rpc.call_blockdev_remove(tgt_node, lv):
4007 warning("Can't remove old LV", hint="manually remove unused LVs")
4010 def _ExecD8Secondary(self, feedback_fn):
4011 """Replace the secondary node for drbd8.
4013 The algorithm for replace is quite complicated:
4014 - for all disks of the instance:
4015 - create new LVs on the new node with same names
4016 - shutdown the drbd device on the old secondary
4017 - disconnect the drbd network on the primary
4018 - create the drbd device on the new secondary
4019 - network attach the drbd on the primary, using an artifice:
4020 the drbd code for Attach() will connect to the network if it
4021 finds a device which is connected to the good local disks but
4023 - wait for sync across all devices
4024 - remove all disks from the old secondary
4026 Failures are not very well handled.
4030 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4031 instance = self.instance
4033 vgname = self.cfg.GetVGName()
4036 old_node = self.tgt_node
4037 new_node = self.new_node
4038 pri_node = instance.primary_node
4040 # Step: check device activation
4041 self.proc.LogStep(1, steps_total, "check device existence")
4042 info("checking volume groups")
4043 my_vg = cfg.GetVGName()
4044 results = rpc.call_vg_list([pri_node, new_node])
4046 raise errors.OpExecError("Can't list volume groups on the nodes")
4047 for node in pri_node, new_node:
4048 res = results.get(node, False)
4049 if not res or my_vg not in res:
4050 raise errors.OpExecError("Volume group '%s' not found on %s" %
4052 for dev in instance.disks:
4053 if not dev.iv_name in self.op.disks:
4055 info("checking %s on %s" % (dev.iv_name, pri_node))
4056 cfg.SetDiskID(dev, pri_node)
4057 if not rpc.call_blockdev_find(pri_node, dev):
4058 raise errors.OpExecError("Can't find device %s on node %s" %
4059 (dev.iv_name, pri_node))
4061 # Step: check other node consistency
4062 self.proc.LogStep(2, steps_total, "check peer consistency")
4063 for dev in instance.disks:
4064 if not dev.iv_name in self.op.disks:
4066 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4067 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4068 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4069 " unsafe to replace the secondary" %
4072 # Step: create new storage
4073 self.proc.LogStep(3, steps_total, "allocate new storage")
4074 for dev in instance.disks:
4076 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4077 # since we *always* want to create this LV, we use the
4078 # _Create...OnPrimary (which forces the creation), even if we
4079 # are talking about the secondary node
4080 for new_lv in dev.children:
4081 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4082 _GetInstanceInfoText(instance)):
4083 raise errors.OpExecError("Failed to create new LV named '%s' on"
4085 (new_lv.logical_id[1], new_node))
4087 iv_names[dev.iv_name] = (dev, dev.children)
4089 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4090 for dev in instance.disks:
4092 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4093 # create new devices on new_node
4094 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4095 logical_id=(pri_node, new_node,
4097 children=dev.children)
4098 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4100 _GetInstanceInfoText(instance)):
4101 raise errors.OpExecError("Failed to create new DRBD on"
4102 " node '%s'" % new_node)
4104 for dev in instance.disks:
4105 # we have new devices, shutdown the drbd on the old secondary
4106 info("shutting down drbd for %s on old node" % dev.iv_name)
4107 cfg.SetDiskID(dev, old_node)
4108 if not rpc.call_blockdev_shutdown(old_node, dev):
4109 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4110 hint="Please cleanup this device manually as soon as possible")
4112 info("detaching primary drbds from the network (=> standalone)")
4114 for dev in instance.disks:
4115 cfg.SetDiskID(dev, pri_node)
4116 # set the physical (unique in bdev terms) id to None, meaning
4117 # detach from network
4118 dev.physical_id = (None,) * len(dev.physical_id)
4119 # and 'find' the device, which will 'fix' it to match the
4121 if rpc.call_blockdev_find(pri_node, dev):
4124 warning("Failed to detach drbd %s from network, unusual case" %
4128 # no detaches succeeded (very unlikely)
4129 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4131 # if we managed to detach at least one, we update all the disks of
4132 # the instance to point to the new secondary
4133 info("updating instance configuration")
4134 for dev in instance.disks:
4135 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4136 cfg.SetDiskID(dev, pri_node)
4137 cfg.Update(instance)
4139 # and now perform the drbd attach
4140 info("attaching primary drbds to new secondary (standalone => connected)")
4142 for dev in instance.disks:
4143 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4144 # since the attach is smart, it's enough to 'find' the device,
4145 # it will automatically activate the network, if the physical_id
4147 cfg.SetDiskID(dev, pri_node)
4148 if not rpc.call_blockdev_find(pri_node, dev):
4149 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4150 "please do a gnt-instance info to see the status of disks")
4152 # this can fail as the old devices are degraded and _WaitForSync
4153 # does a combined result over all disks, so we don't check its
4155 self.proc.LogStep(5, steps_total, "sync devices")
4156 _WaitForSync(cfg, instance, self.proc, unlock=True)
4158 # so check manually all the devices
4159 for name, (dev, old_lvs) in iv_names.iteritems():
4160 cfg.SetDiskID(dev, pri_node)
4161 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4163 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4165 self.proc.LogStep(6, steps_total, "removing old storage")
4166 for name, (dev, old_lvs) in iv_names.iteritems():
4167 info("remove logical volumes for %s" % name)
4169 cfg.SetDiskID(lv, old_node)
4170 if not rpc.call_blockdev_remove(old_node, lv):
4171 warning("Can't remove LV on old secondary",
4172 hint="Cleanup stale volumes by hand")
4174 def Exec(self, feedback_fn):
4175 """Execute disk replacement.
4177 This dispatches the disk replacement to the appropriate handler.
4180 instance = self.instance
4181 if instance.disk_template == constants.DT_REMOTE_RAID1:
4183 elif instance.disk_template == constants.DT_DRBD8:
4184 if self.op.remote_node is None:
4185 fn = self._ExecD8DiskOnly
4187 fn = self._ExecD8Secondary
4189 raise errors.ProgrammerError("Unhandled disk replacement case")
4190 return fn(feedback_fn)
4193 class LUQueryInstanceData(NoHooksLU):
4194 """Query runtime instance data.
4197 _OP_REQP = ["instances"]
4199 def CheckPrereq(self):
4200 """Check prerequisites.
4202 This only checks the optional instance list against the existing names.
4205 if not isinstance(self.op.instances, list):
4206 raise errors.OpPrereqError("Invalid argument type 'instances'")
4207 if self.op.instances:
4208 self.wanted_instances = []
4209 names = self.op.instances
4211 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4212 if instance is None:
4213 raise errors.OpPrereqError("No such instance name '%s'" % name)
4214 self.wanted_instances.append(instance)
4216 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4217 in self.cfg.GetInstanceList()]
4221 def _ComputeDiskStatus(self, instance, snode, dev):
4222 """Compute block device status.
4225 self.cfg.SetDiskID(dev, instance.primary_node)
4226 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4227 if dev.dev_type in constants.LDS_DRBD:
4228 # we change the snode then (otherwise we use the one passed in)
4229 if dev.logical_id[0] == instance.primary_node:
4230 snode = dev.logical_id[1]
4232 snode = dev.logical_id[0]
4235 self.cfg.SetDiskID(dev, snode)
4236 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4241 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4242 for child in dev.children]
4247 "iv_name": dev.iv_name,
4248 "dev_type": dev.dev_type,
4249 "logical_id": dev.logical_id,
4250 "physical_id": dev.physical_id,
4251 "pstatus": dev_pstatus,
4252 "sstatus": dev_sstatus,
4253 "children": dev_children,
4258 def Exec(self, feedback_fn):
4259 """Gather and return data"""
4261 for instance in self.wanted_instances:
4262 remote_info = rpc.call_instance_info(instance.primary_node,
4264 if remote_info and "state" in remote_info:
4267 remote_state = "down"
4268 if instance.status == "down":
4269 config_state = "down"
4273 disks = [self._ComputeDiskStatus(instance, None, device)
4274 for device in instance.disks]
4277 "name": instance.name,
4278 "config_state": config_state,
4279 "run_state": remote_state,
4280 "pnode": instance.primary_node,
4281 "snodes": instance.secondary_nodes,
4283 "memory": instance.memory,
4284 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4286 "network_port": instance.network_port,
4287 "vcpus": instance.vcpus,
4288 "kernel_path": instance.kernel_path,
4289 "initrd_path": instance.initrd_path,
4290 "hvm_boot_order": instance.hvm_boot_order,
4293 result[instance.name] = idict
4298 class LUSetInstanceParms(LogicalUnit):
4299 """Modifies an instances's parameters.
4302 HPATH = "instance-modify"
4303 HTYPE = constants.HTYPE_INSTANCE
4304 _OP_REQP = ["instance_name"]
4306 def BuildHooksEnv(self):
4309 This runs on the master, primary and secondaries.
4314 args['memory'] = self.mem
4316 args['vcpus'] = self.vcpus
4317 if self.do_ip or self.do_bridge or self.mac:
4321 ip = self.instance.nics[0].ip
4323 bridge = self.bridge
4325 bridge = self.instance.nics[0].bridge
4329 mac = self.instance.nics[0].mac
4330 args['nics'] = [(ip, bridge, mac)]
4331 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4332 nl = [self.sstore.GetMasterNode(),
4333 self.instance.primary_node] + list(self.instance.secondary_nodes)
4336 def CheckPrereq(self):
4337 """Check prerequisites.
4339 This only checks the instance list against the existing names.
4342 self.mem = getattr(self.op, "mem", None)
4343 self.vcpus = getattr(self.op, "vcpus", None)
4344 self.ip = getattr(self.op, "ip", None)
4345 self.mac = getattr(self.op, "mac", None)
4346 self.bridge = getattr(self.op, "bridge", None)
4347 self.kernel_path = getattr(self.op, "kernel_path", None)
4348 self.initrd_path = getattr(self.op, "initrd_path", None)
4349 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4350 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4351 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4352 if all_parms.count(None) == len(all_parms):
4353 raise errors.OpPrereqError("No changes submitted")
4354 if self.mem is not None:
4356 self.mem = int(self.mem)
4357 except ValueError, err:
4358 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4359 if self.vcpus is not None:
4361 self.vcpus = int(self.vcpus)
4362 except ValueError, err:
4363 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4364 if self.ip is not None:
4366 if self.ip.lower() == "none":
4369 if not utils.IsValidIP(self.ip):
4370 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4373 self.do_bridge = (self.bridge is not None)
4374 if self.mac is not None:
4375 if self.cfg.IsMacInUse(self.mac):
4376 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4378 if not utils.IsValidMac(self.mac):
4379 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4381 if self.kernel_path is not None:
4382 self.do_kernel_path = True
4383 if self.kernel_path == constants.VALUE_NONE:
4384 raise errors.OpPrereqError("Can't set instance to no kernel")
4386 if self.kernel_path != constants.VALUE_DEFAULT:
4387 if not os.path.isabs(self.kernel_path):
4388 raise errors.OpPrereqError("The kernel path must be an absolute"
4391 self.do_kernel_path = False
4393 if self.initrd_path is not None:
4394 self.do_initrd_path = True
4395 if self.initrd_path not in (constants.VALUE_NONE,
4396 constants.VALUE_DEFAULT):
4397 if not os.path.isabs(self.initrd_path):
4398 raise errors.OpPrereqError("The initrd path must be an absolute"
4401 self.do_initrd_path = False
4403 # boot order verification
4404 if self.hvm_boot_order is not None:
4405 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4406 if len(self.hvm_boot_order.strip("acdn")) != 0:
4407 raise errors.OpPrereqError("invalid boot order specified,"
4408 " must be one or more of [acdn]"
4411 instance = self.cfg.GetInstanceInfo(
4412 self.cfg.ExpandInstanceName(self.op.instance_name))
4413 if instance is None:
4414 raise errors.OpPrereqError("No such instance name '%s'" %
4415 self.op.instance_name)
4416 self.op.instance_name = instance.name
4417 self.instance = instance
4420 def Exec(self, feedback_fn):
4421 """Modifies an instance.
4423 All parameters take effect only at the next restart of the instance.
4426 instance = self.instance
4428 instance.memory = self.mem
4429 result.append(("mem", self.mem))
4431 instance.vcpus = self.vcpus
4432 result.append(("vcpus", self.vcpus))
4434 instance.nics[0].ip = self.ip
4435 result.append(("ip", self.ip))
4437 instance.nics[0].bridge = self.bridge
4438 result.append(("bridge", self.bridge))
4440 instance.nics[0].mac = self.mac
4441 result.append(("mac", self.mac))
4442 if self.do_kernel_path:
4443 instance.kernel_path = self.kernel_path
4444 result.append(("kernel_path", self.kernel_path))
4445 if self.do_initrd_path:
4446 instance.initrd_path = self.initrd_path
4447 result.append(("initrd_path", self.initrd_path))
4448 if self.hvm_boot_order:
4449 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4450 instance.hvm_boot_order = None
4452 instance.hvm_boot_order = self.hvm_boot_order
4453 result.append(("hvm_boot_order", self.hvm_boot_order))
4455 self.cfg.AddInstance(instance)
4460 class LUQueryExports(NoHooksLU):
4461 """Query the exports list
4466 def CheckPrereq(self):
4467 """Check that the nodelist contains only existing nodes.
4470 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4472 def Exec(self, feedback_fn):
4473 """Compute the list of all the exported system images.
4476 a dictionary with the structure node->(export-list)
4477 where export-list is a list of the instances exported on
4481 return rpc.call_export_list(self.nodes)
4484 class LUExportInstance(LogicalUnit):
4485 """Export an instance to an image in the cluster.
4488 HPATH = "instance-export"
4489 HTYPE = constants.HTYPE_INSTANCE
4490 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4492 def BuildHooksEnv(self):
4495 This will run on the master, primary node and target node.
4499 "EXPORT_NODE": self.op.target_node,
4500 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4502 env.update(_BuildInstanceHookEnvByObject(self.instance))
4503 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4504 self.op.target_node]
4507 def CheckPrereq(self):
4508 """Check prerequisites.
4510 This checks that the instance name is a valid one.
4513 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4514 self.instance = self.cfg.GetInstanceInfo(instance_name)
4515 if self.instance is None:
4516 raise errors.OpPrereqError("Instance '%s' not found" %
4517 self.op.instance_name)
4520 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4521 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4523 if self.dst_node is None:
4524 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4525 self.op.target_node)
4526 self.op.target_node = self.dst_node.name
4528 def Exec(self, feedback_fn):
4529 """Export an instance to an image in the cluster.
4532 instance = self.instance
4533 dst_node = self.dst_node
4534 src_node = instance.primary_node
4535 if self.op.shutdown:
4536 # shutdown the instance, but not the disks
4537 if not rpc.call_instance_shutdown(src_node, instance):
4538 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4539 (instance.name, source_node))
4541 vgname = self.cfg.GetVGName()
4546 for disk in instance.disks:
4547 if disk.iv_name == "sda":
4548 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4549 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4551 if not new_dev_name:
4552 logger.Error("could not snapshot block device %s on node %s" %
4553 (disk.logical_id[1], src_node))
4555 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4556 logical_id=(vgname, new_dev_name),
4557 physical_id=(vgname, new_dev_name),
4558 iv_name=disk.iv_name)
4559 snap_disks.append(new_dev)
4562 if self.op.shutdown and instance.status == "up":
4563 if not rpc.call_instance_start(src_node, instance, None):
4564 _ShutdownInstanceDisks(instance, self.cfg)
4565 raise errors.OpExecError("Could not start instance")
4567 # TODO: check for size
4569 for dev in snap_disks:
4570 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4572 logger.Error("could not export block device %s from node"
4574 (dev.logical_id[1], src_node, dst_node.name))
4575 if not rpc.call_blockdev_remove(src_node, dev):
4576 logger.Error("could not remove snapshot block device %s from"
4577 " node %s" % (dev.logical_id[1], src_node))
4579 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4580 logger.Error("could not finalize export for instance %s on node %s" %
4581 (instance.name, dst_node.name))
4583 nodelist = self.cfg.GetNodeList()
4584 nodelist.remove(dst_node.name)
4586 # on one-node clusters nodelist will be empty after the removal
4587 # if we proceed the backup would be removed because OpQueryExports
4588 # substitutes an empty list with the full cluster node list.
4590 op = opcodes.OpQueryExports(nodes=nodelist)
4591 exportlist = self.proc.ChainOpCode(op)
4592 for node in exportlist:
4593 if instance.name in exportlist[node]:
4594 if not rpc.call_export_remove(node, instance.name):
4595 logger.Error("could not remove older export for instance %s"
4596 " on node %s" % (instance.name, node))
4599 class TagsLU(NoHooksLU):
4602 This is an abstract class which is the parent of all the other tags LUs.
4605 def CheckPrereq(self):
4606 """Check prerequisites.
4609 if self.op.kind == constants.TAG_CLUSTER:
4610 self.target = self.cfg.GetClusterInfo()
4611 elif self.op.kind == constants.TAG_NODE:
4612 name = self.cfg.ExpandNodeName(self.op.name)
4614 raise errors.OpPrereqError("Invalid node name (%s)" %
4617 self.target = self.cfg.GetNodeInfo(name)
4618 elif self.op.kind == constants.TAG_INSTANCE:
4619 name = self.cfg.ExpandInstanceName(self.op.name)
4621 raise errors.OpPrereqError("Invalid instance name (%s)" %
4624 self.target = self.cfg.GetInstanceInfo(name)
4626 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4630 class LUGetTags(TagsLU):
4631 """Returns the tags of a given object.
4634 _OP_REQP = ["kind", "name"]
4636 def Exec(self, feedback_fn):
4637 """Returns the tag list.
4640 return self.target.GetTags()
4643 class LUSearchTags(NoHooksLU):
4644 """Searches the tags for a given pattern.
4647 _OP_REQP = ["pattern"]
4649 def CheckPrereq(self):
4650 """Check prerequisites.
4652 This checks the pattern passed for validity by compiling it.
4656 self.re = re.compile(self.op.pattern)
4657 except re.error, err:
4658 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4659 (self.op.pattern, err))
4661 def Exec(self, feedback_fn):
4662 """Returns the tag list.
4666 tgts = [("/cluster", cfg.GetClusterInfo())]
4667 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4668 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4669 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4670 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4672 for path, target in tgts:
4673 for tag in target.GetTags():
4674 if self.re.search(tag):
4675 results.append((path, tag))
4679 class LUAddTags(TagsLU):
4680 """Sets a tag on a given object.
4683 _OP_REQP = ["kind", "name", "tags"]
4685 def CheckPrereq(self):
4686 """Check prerequisites.
4688 This checks the type and length of the tag name and value.
4691 TagsLU.CheckPrereq(self)
4692 for tag in self.op.tags:
4693 objects.TaggableObject.ValidateTag(tag)
4695 def Exec(self, feedback_fn):
4700 for tag in self.op.tags:
4701 self.target.AddTag(tag)
4702 except errors.TagError, err:
4703 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4705 self.cfg.Update(self.target)
4706 except errors.ConfigurationError:
4707 raise errors.OpRetryError("There has been a modification to the"
4708 " config file and the operation has been"
4709 " aborted. Please retry.")
4712 class LUDelTags(TagsLU):
4713 """Delete a list of tags from a given object.
4716 _OP_REQP = ["kind", "name", "tags"]
4718 def CheckPrereq(self):
4719 """Check prerequisites.
4721 This checks that we have the given tag.
4724 TagsLU.CheckPrereq(self)
4725 for tag in self.op.tags:
4726 objects.TaggableObject.ValidateTag(tag)
4727 del_tags = frozenset(self.op.tags)
4728 cur_tags = self.target.GetTags()
4729 if not del_tags <= cur_tags:
4730 diff_tags = del_tags - cur_tags
4731 diff_names = ["'%s'" % tag for tag in diff_tags]
4733 raise errors.OpPrereqError("Tag(s) %s not found" %
4734 (",".join(diff_names)))
4736 def Exec(self, feedback_fn):
4737 """Remove the tag from the object.
4740 for tag in self.op.tags:
4741 self.target.RemoveTag(tag)
4743 self.cfg.Update(self.target)
4744 except errors.ConfigurationError:
4745 raise errors.OpRetryError("There has been a modification to the"
4746 " config file and the operation has been"
4747 " aborted. Please retry.")
4749 class LUTestDelay(NoHooksLU):
4750 """Sleep for a specified amount of time.
4752 This LU sleeps on the master and/or nodes for a specified amoutn of
4756 _OP_REQP = ["duration", "on_master", "on_nodes"]
4758 def CheckPrereq(self):
4759 """Check prerequisites.
4761 This checks that we have a good list of nodes and/or the duration
4766 if self.op.on_nodes:
4767 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4769 def Exec(self, feedback_fn):
4770 """Do the actual sleep.
4773 if self.op.on_master:
4774 if not utils.TestDelay(self.op.duration):
4775 raise errors.OpExecError("Error during master delay test")
4776 if self.op.on_nodes:
4777 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4779 raise errors.OpExecError("Complete failure from rpc call")
4780 for node, node_result in result.items():
4782 raise errors.OpExecError("Failure during rpc call to node %s,"
4783 " result: %s" % (node, node_result))
4786 class IAllocator(object):
4787 """IAllocator framework.
4789 An IAllocator instance has three sets of attributes:
4790 - cfg/sstore that are needed to query the cluster
4791 - input data (all members of the _KEYS class attribute are required)
4792 - four buffer attributes (in|out_data|text), that represent the
4793 input (to the external script) in text and data structure format,
4794 and the output from it, again in two formats
4795 - the result variables from the script (success, info, nodes) for
4801 "mem_size", "disks", "disk_template",
4802 "os", "tags", "nics", "vcpus",
4805 def __init__(self, cfg, sstore, **kwargs):
4807 self.sstore = sstore
4808 # init buffer variables
4809 self.in_text = self.out_text = self.in_data = self.out_data = None
4810 # init all input fields so that pylint is happy
4811 self.mode = self.name = None
4812 self.mem_size = self.disks = self.disk_template = None
4813 self.os = self.tags = self.nics = self.vcpus = None
4814 # init result fields
4815 self.success = self.info = self.nodes = None
4817 if key not in self._KEYS:
4818 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4819 " IAllocator" % key)
4820 setattr(self, key, kwargs[key])
4821 for key in self._KEYS:
4822 if key not in kwargs:
4823 raise errors.ProgrammerError("Missing input parameter '%s' to"
4824 " IAllocator" % key)
4825 self._BuildInputData()
4827 def _ComputeClusterData(self):
4828 """Compute the generic allocator input data.
4830 This is the data that is independent of the actual operation.
4837 "cluster_name": self.sstore.GetClusterName(),
4838 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4839 # we don't have job IDs
4844 node_list = cfg.GetNodeList()
4845 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4846 for nname in node_list:
4847 ninfo = cfg.GetNodeInfo(nname)
4848 if nname not in node_data or not isinstance(node_data[nname], dict):
4849 raise errors.OpExecError("Can't get data for node %s" % nname)
4850 remote_info = node_data[nname]
4851 for attr in ['memory_total', 'memory_free',
4852 'vg_size', 'vg_free']:
4853 if attr not in remote_info:
4854 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4857 int(remote_info[attr])
4858 except ValueError, err:
4859 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4860 " %s" % (nname, attr, str(err)))
4862 "tags": list(ninfo.GetTags()),
4863 "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4864 "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4865 "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4866 "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4867 "primary_ip": ninfo.primary_ip,
4868 "secondary_ip": ninfo.secondary_ip,
4870 node_results[nname] = pnr
4871 data["nodes"] = node_results
4875 i_list = cfg.GetInstanceList()
4876 for iname in i_list:
4877 iinfo = cfg.GetInstanceInfo(iname)
4878 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4879 for n in iinfo.nics]
4881 "tags": list(iinfo.GetTags()),
4882 "should_run": iinfo.status == "up",
4883 "vcpus": iinfo.vcpus,
4884 "memory": iinfo.memory,
4886 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4888 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4889 "disk_template": iinfo.disk_template,
4891 instance_data[iname] = pir
4893 data["instances"] = instance_data
4897 def _AddNewInstance(self):
4898 """Add new instance data to allocator structure.
4900 This in combination with _AllocatorGetClusterData will create the
4901 correct structure needed as input for the allocator.
4903 The checks for the completeness of the opcode must have already been
4908 if len(self.disks) != 2:
4909 raise errors.OpExecError("Only two-disk configurations supported")
4911 disk_space = _ComputeDiskSize(self.disk_template,
4912 self.disks[0]["size"], self.disks[1]["size"])
4917 "disk_template": self.disk_template,
4920 "vcpus": self.vcpus,
4921 "memory": self.mem_size,
4922 "disks": self.disks,
4923 "disk_space_total": disk_space,
4926 data["request"] = request
4928 def _AddRelocateInstance(self):
4929 """Add relocate instance data to allocator structure.
4931 This in combination with _IAllocatorGetClusterData will create the
4932 correct structure needed as input for the allocator.
4934 The checks for the completeness of the opcode must have already been
4940 "type": "replace_secondary",
4943 data["request"] = request
4945 def _BuildInputData(self):
4946 """Build input data structures.
4949 self._ComputeClusterData()
4951 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4952 self._AddNewInstance()
4954 self._AddRelocateInstance()
4956 self.in_text = serializer.Dump(self.in_data)
4958 def Run(self, name, validate=True):
4959 """Run an instance allocator and return the results.
4964 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4966 if alloc_script is None:
4967 raise errors.OpExecError("Can't find allocator '%s'" % name)
4969 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4973 result = utils.RunCmd([alloc_script, fin_name])
4975 raise errors.OpExecError("Instance allocator call failed: %s,"
4977 (result.fail_reason, result.stdout))
4980 self.out_text = result.stdout
4982 self._ValidateResult()
4984 def _ValidateResult(self):
4985 """Process the allocator results.
4987 This will process and if successful save the result in
4988 self.out_data and the other parameters.
4992 rdict = serializer.Load(self.out_text)
4993 except Exception, err:
4994 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4996 if not isinstance(rdict, dict):
4997 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4999 for key in "success", "info", "nodes":
5000 if key not in rdict:
5001 raise errors.OpExecError("Can't parse iallocator results:"
5002 " missing key '%s'" % key)
5003 setattr(self, key, rdict[key])
5005 if not isinstance(rdict["nodes"], list):
5006 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5008 self.out_data = rdict
5011 class LUTestAllocator(NoHooksLU):
5012 """Run allocator tests.
5014 This LU runs the allocator tests
5017 _OP_REQP = ["direction", "mode", "name"]
5019 def CheckPrereq(self):
5020 """Check prerequisites.
5022 This checks the opcode parameters depending on the director and mode test.
5025 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5026 for attr in ["name", "mem_size", "disks", "disk_template",
5027 "os", "tags", "nics", "vcpus"]:
5028 if not hasattr(self.op, attr):
5029 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5031 iname = self.cfg.ExpandInstanceName(self.op.name)
5032 if iname is not None:
5033 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5035 if not isinstance(self.op.nics, list):
5036 raise errors.OpPrereqError("Invalid parameter 'nics'")
5037 for row in self.op.nics:
5038 if (not isinstance(row, dict) or
5041 "bridge" not in row):
5042 raise errors.OpPrereqError("Invalid contents of the"
5043 " 'nics' parameter")
5044 if not isinstance(self.op.disks, list):
5045 raise errors.OpPrereqError("Invalid parameter 'disks'")
5046 if len(self.op.disks) != 2:
5047 raise errors.OpPrereqError("Only two-disk configurations supported")
5048 for row in self.op.disks:
5049 if (not isinstance(row, dict) or
5050 "size" not in row or
5051 not isinstance(row["size"], int) or
5052 "mode" not in row or
5053 row["mode"] not in ['r', 'w']):
5054 raise errors.OpPrereqError("Invalid contents of the"
5055 " 'disks' parameter")
5056 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5057 if not hasattr(self.op, "name"):
5058 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5059 fname = self.cfg.ExpandInstanceName(self.op.name)
5061 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5063 self.op.name = fname
5065 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5068 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5069 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5070 raise errors.OpPrereqError("Missing allocator name")
5071 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5072 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5075 def Exec(self, feedback_fn):
5076 """Run the allocator test.
5079 ial = IAllocator(self.cfg, self.sstore,
5082 mem_size=self.op.mem_size,
5083 disks=self.op.disks,
5084 disk_template=self.op.disk_template,
5088 vcpus=self.op.vcpus,
5091 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5092 result = ial.in_text
5094 ial.Run(self.op.allocator, validate=False)
5095 result = ial.out_text