4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 As for the node lists, the master should not be included in the
137 them, as it will be added by the hooks runner in case this LU
138 requires a cluster to run on (otherwise we don't have a node
139 list). No nodes should be returned as an empty list (and not
142 Note that if the HPATH for a LU class is None, this function will
146 raise NotImplementedError
149 class NoHooksLU(LogicalUnit):
150 """Simple LU which runs no hooks.
152 This LU is intended as a parent for other LogicalUnits which will
153 run no hooks, in order to reduce duplicate code.
159 def BuildHooksEnv(self):
162 This is a no-op, since we don't run hooks.
168 def _AddHostToEtcHosts(hostname):
169 """Wrapper around utils.SetEtcHostsEntry.
172 hi = utils.HostInfo(name=hostname)
173 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
176 def _RemoveHostFromEtcHosts(hostname):
177 """Wrapper around utils.RemoveEtcHostsEntry.
180 hi = utils.HostInfo(name=hostname)
181 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
182 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
185 def _GetWantedNodes(lu, nodes):
186 """Returns list of checked and expanded node names.
189 nodes: List of nodes (strings) or None for all
192 if not isinstance(nodes, list):
193 raise errors.OpPrereqError("Invalid argument type 'nodes'")
199 node = lu.cfg.ExpandNodeName(name)
201 raise errors.OpPrereqError("No such node name '%s'" % name)
205 wanted = lu.cfg.GetNodeList()
206 return utils.NiceSort(wanted)
209 def _GetWantedInstances(lu, instances):
210 """Returns list of checked and expanded instance names.
213 instances: List of instances (strings) or None for all
216 if not isinstance(instances, list):
217 raise errors.OpPrereqError("Invalid argument type 'instances'")
222 for name in instances:
223 instance = lu.cfg.ExpandInstanceName(name)
225 raise errors.OpPrereqError("No such instance name '%s'" % name)
226 wanted.append(instance)
229 wanted = lu.cfg.GetInstanceList()
230 return utils.NiceSort(wanted)
233 def _CheckOutputFields(static, dynamic, selected):
234 """Checks whether all selected fields are valid.
237 static: Static fields
238 dynamic: Dynamic fields
241 static_fields = frozenset(static)
242 dynamic_fields = frozenset(dynamic)
244 all_fields = static_fields | dynamic_fields
246 if not all_fields.issuperset(selected):
247 raise errors.OpPrereqError("Unknown output fields selected: %s"
248 % ",".join(frozenset(selected).
249 difference(all_fields)))
252 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253 memory, vcpus, nics):
254 """Builds instance related env variables for hooks from single variables.
257 secondary_nodes: List of secondary nodes as strings
261 "INSTANCE_NAME": name,
262 "INSTANCE_PRIMARY": primary_node,
263 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264 "INSTANCE_OS_TYPE": os_type,
265 "INSTANCE_STATUS": status,
266 "INSTANCE_MEMORY": memory,
267 "INSTANCE_VCPUS": vcpus,
271 nic_count = len(nics)
272 for idx, (ip, bridge, mac) in enumerate(nics):
275 env["INSTANCE_NIC%d_IP" % idx] = ip
276 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
281 env["INSTANCE_NIC_COUNT"] = nic_count
286 def _BuildInstanceHookEnvByObject(instance, override=None):
287 """Builds instance related env variables for hooks from an object.
290 instance: objects.Instance object of instance
291 override: dict of values to override
294 'name': instance.name,
295 'primary_node': instance.primary_node,
296 'secondary_nodes': instance.secondary_nodes,
297 'os_type': instance.os,
298 'status': instance.os,
299 'memory': instance.memory,
300 'vcpus': instance.vcpus,
301 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
304 args.update(override)
305 return _BuildInstanceHookEnv(**args)
308 def _UpdateKnownHosts(fullnode, ip, pubkey):
309 """Ensure a node has a correct known_hosts entry.
312 fullnode - Fully qualified domain name of host. (str)
313 ip - IPv4 address of host (str)
314 pubkey - the public key of the cluster
317 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
318 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
320 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
329 logger.Debug('read %s' % (repr(rawline),))
331 parts = rawline.rstrip('\r\n').split()
333 # Ignore unwanted lines
334 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
335 fields = parts[0].split(',')
340 for spec in [ ip, fullnode ]:
341 if spec not in fields:
346 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
347 if haveall and key == pubkey:
349 save_lines.append(rawline)
350 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
353 if havesome and (not haveall or key != pubkey):
355 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
358 save_lines.append(rawline)
361 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
362 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
365 save_lines = save_lines + add_lines
367 # Write a new file and replace old.
368 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
370 newfile = os.fdopen(fd, 'w')
372 newfile.write(''.join(save_lines))
375 logger.Debug("Wrote new known_hosts.")
376 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
379 # Simply appending a new line will do the trick.
381 for add in add_lines:
387 def _HasValidVG(vglist, vgname):
388 """Checks if the volume group list is valid.
390 A non-None return value means there's an error, and the return value
391 is the error message.
394 vgsize = vglist.get(vgname, None)
396 return "volume group '%s' missing" % vgname
398 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
403 def _InitSSHSetup(node):
404 """Setup the SSH configuration for the cluster.
407 This generates a dsa keypair for root, adds the pub key to the
408 permitted hosts and adds the hostkey to its own known hosts.
411 node: the name of this host as a fqdn
414 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
416 for name in priv_key, pub_key:
417 if os.path.exists(name):
418 utils.CreateBackup(name)
419 utils.RemoveFile(name)
421 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
425 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
428 f = open(pub_key, 'r')
430 utils.AddAuthorizedKey(auth_keys, f.read(8192))
435 def _InitGanetiServerSetup(ss):
436 """Setup the necessary configuration for the initial node daemon.
438 This creates the nodepass file containing the shared password for
439 the cluster and also generates the SSL certificate.
442 # Create pseudo random password
443 randpass = sha.new(os.urandom(64)).hexdigest()
444 # and write it into sstore
445 ss.SetKey(ss.SS_NODED_PASS, randpass)
447 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
448 "-days", str(365*5), "-nodes", "-x509",
449 "-keyout", constants.SSL_CERT_FILE,
450 "-out", constants.SSL_CERT_FILE, "-batch"])
452 raise errors.OpExecError("could not generate server ssl cert, command"
453 " %s had exitcode %s and error message %s" %
454 (result.cmd, result.exit_code, result.output))
456 os.chmod(constants.SSL_CERT_FILE, 0400)
458 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
461 raise errors.OpExecError("Could not start the node daemon, command %s"
462 " had exitcode %s and error %s" %
463 (result.cmd, result.exit_code, result.output))
466 def _CheckInstanceBridgesExist(instance):
467 """Check that the brigdes needed by an instance exist.
470 # check bridges existance
471 brlist = [nic.bridge for nic in instance.nics]
472 if not rpc.call_bridges_exist(instance.primary_node, brlist):
473 raise errors.OpPrereqError("one or more target bridges %s does not"
474 " exist on destination node '%s'" %
475 (brlist, instance.primary_node))
478 class LUInitCluster(LogicalUnit):
479 """Initialise the cluster.
482 HPATH = "cluster-init"
483 HTYPE = constants.HTYPE_CLUSTER
484 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
485 "def_bridge", "master_netdev"]
488 def BuildHooksEnv(self):
491 Notes: Since we don't require a cluster, we must manually add
492 ourselves in the post-run node list.
495 env = {"OP_TARGET": self.op.cluster_name}
496 return env, [], [self.hostname.name]
498 def CheckPrereq(self):
499 """Verify that the passed name is a valid one.
502 if config.ConfigWriter.IsCluster():
503 raise errors.OpPrereqError("Cluster is already initialised")
505 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
506 if not os.path.exists(constants.VNC_PASSWORD_FILE):
507 raise errors.OpPrereqError("Please prepare the cluster VNC"
509 constants.VNC_PASSWORD_FILE)
511 self.hostname = hostname = utils.HostInfo()
513 if hostname.ip.startswith("127."):
514 raise errors.OpPrereqError("This host's IP resolves to the private"
515 " range (%s). Please fix DNS or %s." %
516 (hostname.ip, constants.ETC_HOSTS))
518 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
519 source=constants.LOCALHOST_IP_ADDRESS):
520 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521 " to %s,\nbut this ip address does not"
522 " belong to this host."
523 " Aborting." % hostname.ip)
525 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
527 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
529 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
531 secondary_ip = getattr(self.op, "secondary_ip", None)
532 if secondary_ip and not utils.IsValidIP(secondary_ip):
533 raise errors.OpPrereqError("Invalid secondary ip given")
535 secondary_ip != hostname.ip and
536 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
537 source=constants.LOCALHOST_IP_ADDRESS))):
538 raise errors.OpPrereqError("You gave %s as secondary IP,"
539 " but it does not belong to this host." %
541 self.secondary_ip = secondary_ip
543 # checks presence of the volume group given
544 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
547 raise errors.OpPrereqError("Error: %s" % vgstatus)
549 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
551 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
554 if self.op.hypervisor_type not in constants.HYPER_TYPES:
555 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
556 self.op.hypervisor_type)
558 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
560 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
561 (self.op.master_netdev,
562 result.output.strip()))
564 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
565 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
566 raise errors.OpPrereqError("Init.d script '%s' missing or not"
567 " executable." % constants.NODE_INITD_SCRIPT)
569 def Exec(self, feedback_fn):
570 """Initialize the cluster.
573 clustername = self.clustername
574 hostname = self.hostname
576 # set up the simple store
577 self.sstore = ss = ssconf.SimpleStore()
578 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
579 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
580 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
581 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
582 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
584 # set up the inter-node password and certificate
585 _InitGanetiServerSetup(ss)
587 # start the master ip
588 rpc.call_node_start_master(hostname.name)
590 # set up ssh config and /etc/hosts
591 f = open(constants.SSH_HOST_RSA_PUB, 'r')
596 sshkey = sshline.split(" ")[1]
598 _AddHostToEtcHosts(hostname.name)
600 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
602 _InitSSHSetup(hostname.name)
604 # init of cluster config file
605 self.cfg = cfgw = config.ConfigWriter()
606 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
607 sshkey, self.op.mac_prefix,
608 self.op.vg_name, self.op.def_bridge)
611 class LUDestroyCluster(NoHooksLU):
612 """Logical unit for destroying the cluster.
617 def CheckPrereq(self):
618 """Check prerequisites.
620 This checks whether the cluster is empty.
622 Any errors are signalled by raising errors.OpPrereqError.
625 master = self.sstore.GetMasterNode()
627 nodelist = self.cfg.GetNodeList()
628 if len(nodelist) != 1 or nodelist[0] != master:
629 raise errors.OpPrereqError("There are still %d node(s) in"
630 " this cluster." % (len(nodelist) - 1))
631 instancelist = self.cfg.GetInstanceList()
633 raise errors.OpPrereqError("There are still %d instance(s) in"
634 " this cluster." % len(instancelist))
636 def Exec(self, feedback_fn):
637 """Destroys the cluster.
640 master = self.sstore.GetMasterNode()
641 if not rpc.call_node_stop_master(master):
642 raise errors.OpExecError("Could not disable the master role")
643 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
644 utils.CreateBackup(priv_key)
645 utils.CreateBackup(pub_key)
646 rpc.call_node_leave_cluster(master)
649 class LUVerifyCluster(NoHooksLU):
650 """Verifies the cluster status.
653 _OP_REQP = ["skip_checks"]
655 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
656 remote_version, feedback_fn):
657 """Run multiple tests against a node.
660 - compares ganeti version
661 - checks vg existance and size > 20G
662 - checks config file checksum
663 - checks ssh to other nodes
666 node: name of the node to check
667 file_list: required list of files
668 local_cksum: dictionary of local files and their checksums
671 # compares ganeti version
672 local_version = constants.PROTOCOL_VERSION
673 if not remote_version:
674 feedback_fn(" - ERROR: connection to %s failed" % (node))
677 if local_version != remote_version:
678 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
679 (local_version, node, remote_version))
682 # checks vg existance and size > 20G
686 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
690 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
692 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
695 # checks config file checksum
698 if 'filelist' not in node_result:
700 feedback_fn(" - ERROR: node hasn't returned file checksum data")
702 remote_cksum = node_result['filelist']
703 for file_name in file_list:
704 if file_name not in remote_cksum:
706 feedback_fn(" - ERROR: file '%s' missing" % file_name)
707 elif remote_cksum[file_name] != local_cksum[file_name]:
709 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
711 if 'nodelist' not in node_result:
713 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
715 if node_result['nodelist']:
717 for node in node_result['nodelist']:
718 feedback_fn(" - ERROR: communication with node '%s': %s" %
719 (node, node_result['nodelist'][node]))
720 hyp_result = node_result.get('hypervisor', None)
721 if hyp_result is not None:
722 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
725 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
726 node_instance, feedback_fn):
727 """Verify an instance.
729 This function checks to see if the required block devices are
730 available on the instance's node.
735 node_current = instanceconfig.primary_node
738 instanceconfig.MapLVsByNode(node_vol_should)
740 for node in node_vol_should:
741 for volume in node_vol_should[node]:
742 if node not in node_vol_is or volume not in node_vol_is[node]:
743 feedback_fn(" - ERROR: volume %s missing on node %s" %
747 if not instanceconfig.status == 'down':
748 if (node_current not in node_instance or
749 not instance in node_instance[node_current]):
750 feedback_fn(" - ERROR: instance %s not running on node %s" %
751 (instance, node_current))
754 for node in node_instance:
755 if (not node == node_current):
756 if instance in node_instance[node]:
757 feedback_fn(" - ERROR: instance %s should not run on node %s" %
763 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764 """Verify if there are any unknown volumes in the cluster.
766 The .os, .swap and backup volumes are ignored. All other volumes are
772 for node in node_vol_is:
773 for volume in node_vol_is[node]:
774 if node not in node_vol_should or volume not in node_vol_should[node]:
775 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
780 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781 """Verify the list of running instances.
783 This checks what instances are running but unknown to the cluster.
787 for node in node_instance:
788 for runninginstance in node_instance[node]:
789 if runninginstance not in instancelist:
790 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
791 (runninginstance, node))
795 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
796 """Verify N+1 Memory Resilience.
798 Check that if one single node dies we can still start all the instances it
804 for node, nodeinfo in node_info.iteritems():
805 # This code checks that every node which is now listed as secondary has
806 # enough memory to host all instances it is supposed to should a single
807 # other node in the cluster fail.
808 # FIXME: not ready for failover to an arbitrary node
809 # FIXME: does not support file-backed instances
810 # WARNING: we currently take into account down instances as well as up
811 # ones, considering that even if they're down someone might want to start
812 # them even in the event of a node failure.
813 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
815 for instance in instances:
816 needed_mem += instance_cfg[instance].memory
817 if nodeinfo['mfree'] < needed_mem:
818 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
819 " failovers should node %s fail" % (node, prinode))
823 def CheckPrereq(self):
824 """Check prerequisites.
826 Transform the list of checks we're going to skip into a set and check that
827 all its members are valid.
830 self.skip_set = frozenset(self.op.skip_checks)
831 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
832 raise errors.OpPrereqError("Invalid checks to be skipped specified")
834 def Exec(self, feedback_fn):
835 """Verify integrity of cluster, performing various test on nodes.
839 feedback_fn("* Verifying global settings")
840 for msg in self.cfg.VerifyConfig():
841 feedback_fn(" - ERROR: %s" % msg)
843 vg_name = self.cfg.GetVGName()
844 nodelist = utils.NiceSort(self.cfg.GetNodeList())
845 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
846 i_non_redundant = [] # Non redundant instances
852 # FIXME: verify OS list
854 file_names = list(self.sstore.GetFileList())
855 file_names.append(constants.SSL_CERT_FILE)
856 file_names.append(constants.CLUSTER_CONF_FILE)
857 local_checksums = utils.FingerprintFiles(file_names)
859 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
860 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
861 all_instanceinfo = rpc.call_instance_list(nodelist)
862 all_vglist = rpc.call_vg_list(nodelist)
863 node_verify_param = {
864 'filelist': file_names,
865 'nodelist': nodelist,
868 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
869 all_rversion = rpc.call_version(nodelist)
870 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
872 for node in nodelist:
873 feedback_fn("* Verifying node %s" % node)
874 result = self._VerifyNode(node, file_names, local_checksums,
875 all_vglist[node], all_nvinfo[node],
876 all_rversion[node], feedback_fn)
880 volumeinfo = all_volumeinfo[node]
882 if isinstance(volumeinfo, basestring):
883 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
884 (node, volumeinfo[-400:].encode('string_escape')))
886 node_volume[node] = {}
887 elif not isinstance(volumeinfo, dict):
888 feedback_fn(" - ERROR: connection to %s failed" % (node,))
892 node_volume[node] = volumeinfo
895 nodeinstance = all_instanceinfo[node]
896 if type(nodeinstance) != list:
897 feedback_fn(" - ERROR: connection to %s failed" % (node,))
901 node_instance[node] = nodeinstance
904 nodeinfo = all_ninfo[node]
905 if not isinstance(nodeinfo, dict):
906 feedback_fn(" - ERROR: connection to %s failed" % (node,))
912 "mfree": int(nodeinfo['memory_free']),
913 "dfree": int(nodeinfo['vg_free']),
916 # dictionary holding all instances this node is secondary for,
917 # grouped by their primary node. Each key is a cluster node, and each
918 # value is a list of instances which have the key as primary and the
919 # current node as secondary. this is handy to calculate N+1 memory
920 # availability if you can only failover from a primary to its
922 "sinst-by-pnode": {},
925 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
931 for instance in instancelist:
932 feedback_fn("* Verifying instance %s" % instance)
933 inst_config = self.cfg.GetInstanceInfo(instance)
934 result = self._VerifyInstance(instance, inst_config, node_volume,
935 node_instance, feedback_fn)
938 inst_config.MapLVsByNode(node_vol_should)
940 instance_cfg[instance] = inst_config
942 pnode = inst_config.primary_node
943 if pnode in node_info:
944 node_info[pnode]['pinst'].append(instance)
946 feedback_fn(" - ERROR: instance %s, connection to primary node"
947 " %s failed" % (instance, pnode))
950 # If the instance is non-redundant we cannot survive losing its primary
951 # node, so we are not N+1 compliant. On the other hand we have no disk
952 # templates with more than one secondary so that situation is not well
954 # FIXME: does not support file-backed instances
955 if len(inst_config.secondary_nodes) == 0:
956 i_non_redundant.append(instance)
957 elif len(inst_config.secondary_nodes) > 1:
958 feedback_fn(" - WARNING: multiple secondaries for instance %s"
961 for snode in inst_config.secondary_nodes:
962 if snode in node_info:
963 node_info[snode]['sinst'].append(instance)
964 if pnode not in node_info[snode]['sinst-by-pnode']:
965 node_info[snode]['sinst-by-pnode'][pnode] = []
966 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
968 feedback_fn(" - ERROR: instance %s, connection to secondary node"
969 " %s failed" % (instance, snode))
971 feedback_fn("* Verifying orphan volumes")
972 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976 feedback_fn("* Verifying remaining instances")
977 result = self._VerifyOrphanInstances(instancelist, node_instance,
981 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
982 feedback_fn("* Verifying N+1 Memory redundancy")
983 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
986 feedback_fn("* Other Notes")
988 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
989 % len(i_non_redundant))
994 class LUVerifyDisks(NoHooksLU):
995 """Verifies the cluster disks status.
1000 def CheckPrereq(self):
1001 """Check prerequisites.
1003 This has no prerequisites.
1008 def Exec(self, feedback_fn):
1009 """Verify integrity of cluster disks.
1012 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1014 vg_name = self.cfg.GetVGName()
1015 nodes = utils.NiceSort(self.cfg.GetNodeList())
1016 instances = [self.cfg.GetInstanceInfo(name)
1017 for name in self.cfg.GetInstanceList()]
1020 for inst in instances:
1022 if (inst.status != "up" or
1023 inst.disk_template not in constants.DTS_NET_MIRROR):
1025 inst.MapLVsByNode(inst_lvs)
1026 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1027 for node, vol_list in inst_lvs.iteritems():
1028 for vol in vol_list:
1029 nv_dict[(node, vol)] = inst
1034 node_lvs = rpc.call_volume_list(nodes, vg_name)
1039 lvs = node_lvs[node]
1041 if isinstance(lvs, basestring):
1042 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1043 res_nlvm[node] = lvs
1044 elif not isinstance(lvs, dict):
1045 logger.Info("connection to node %s failed or invalid data returned" %
1047 res_nodes.append(node)
1050 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1051 inst = nv_dict.pop((node, lv_name), None)
1052 if (not lv_online and inst is not None
1053 and inst.name not in res_instances):
1054 res_instances.append(inst.name)
1056 # any leftover items in nv_dict are missing LVs, let's arrange the
1058 for key, inst in nv_dict.iteritems():
1059 if inst.name not in res_missing:
1060 res_missing[inst.name] = []
1061 res_missing[inst.name].append(key)
1066 class LURenameCluster(LogicalUnit):
1067 """Rename the cluster.
1070 HPATH = "cluster-rename"
1071 HTYPE = constants.HTYPE_CLUSTER
1074 def BuildHooksEnv(self):
1079 "OP_TARGET": self.sstore.GetClusterName(),
1080 "NEW_NAME": self.op.name,
1082 mn = self.sstore.GetMasterNode()
1083 return env, [mn], [mn]
1085 def CheckPrereq(self):
1086 """Verify that the passed name is a valid one.
1089 hostname = utils.HostInfo(self.op.name)
1091 new_name = hostname.name
1092 self.ip = new_ip = hostname.ip
1093 old_name = self.sstore.GetClusterName()
1094 old_ip = self.sstore.GetMasterIP()
1095 if new_name == old_name and new_ip == old_ip:
1096 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1097 " cluster has changed")
1098 if new_ip != old_ip:
1099 result = utils.RunCmd(["fping", "-q", new_ip])
1100 if not result.failed:
1101 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102 " reachable on the network. Aborting." %
1105 self.op.name = new_name
1107 def Exec(self, feedback_fn):
1108 """Rename the cluster.
1111 clustername = self.op.name
1115 # shutdown the master IP
1116 master = ss.GetMasterNode()
1117 if not rpc.call_node_stop_master(master):
1118 raise errors.OpExecError("Could not disable the master role")
1122 ss.SetKey(ss.SS_MASTER_IP, ip)
1123 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1125 # Distribute updated ss config to all nodes
1126 myself = self.cfg.GetNodeInfo(master)
1127 dist_nodes = self.cfg.GetNodeList()
1128 if myself.name in dist_nodes:
1129 dist_nodes.remove(myself.name)
1131 logger.Debug("Copying updated ssconf data to all nodes")
1132 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133 fname = ss.KeyToFilename(keyname)
1134 result = rpc.call_upload_file(dist_nodes, fname)
1135 for to_node in dist_nodes:
1136 if not result[to_node]:
1137 logger.Error("copy of file %s to node %s failed" %
1140 if not rpc.call_node_start_master(master):
1141 logger.Error("Could not re-enable the master role on the master,"
1142 " please restart manually.")
1145 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1146 """Sleep and poll for an instance's disk to sync.
1149 if not instance.disks:
1153 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1155 node = instance.primary_node
1157 for dev in instance.disks:
1158 cfgw.SetDiskID(dev, node)
1164 cumul_degraded = False
1165 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1167 proc.LogWarning("Can't get any data from node %s" % node)
1170 raise errors.RemoteError("Can't contact node %s for mirror data,"
1171 " aborting." % node)
1175 for i in range(len(rstats)):
1178 proc.LogWarning("Can't compute data for node %s/%s" %
1179 (node, instance.disks[i].iv_name))
1181 # we ignore the ldisk parameter
1182 perc_done, est_time, is_degraded, _ = mstat
1183 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1184 if perc_done is not None:
1186 if est_time is not None:
1187 rem_time = "%d estimated seconds remaining" % est_time
1190 rem_time = "no time estimate"
1191 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1192 (instance.disks[i].iv_name, perc_done, rem_time))
1199 time.sleep(min(60, max_time))
1205 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1206 return not cumul_degraded
1209 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1210 """Check that mirrors are not degraded.
1212 The ldisk parameter, if True, will change the test from the
1213 is_degraded attribute (which represents overall non-ok status for
1214 the device(s)) to the ldisk (representing the local storage status).
1217 cfgw.SetDiskID(dev, node)
1224 if on_primary or dev.AssembleOnSecondary():
1225 rstats = rpc.call_blockdev_find(node, dev)
1227 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1230 result = result and (not rstats[idx])
1232 for child in dev.children:
1233 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1238 class LUDiagnoseOS(NoHooksLU):
1239 """Logical unit for OS diagnose/query.
1244 def CheckPrereq(self):
1245 """Check prerequisites.
1247 This always succeeds, since this is a pure query LU.
1252 def Exec(self, feedback_fn):
1253 """Compute the list of OSes.
1256 node_list = self.cfg.GetNodeList()
1257 node_data = rpc.call_os_diagnose(node_list)
1258 if node_data == False:
1259 raise errors.OpExecError("Can't gather the list of OSes")
1263 class LURemoveNode(LogicalUnit):
1264 """Logical unit for removing a node.
1267 HPATH = "node-remove"
1268 HTYPE = constants.HTYPE_NODE
1269 _OP_REQP = ["node_name"]
1271 def BuildHooksEnv(self):
1274 This doesn't run on the target node in the pre phase as a failed
1275 node would not allows itself to run.
1279 "OP_TARGET": self.op.node_name,
1280 "NODE_NAME": self.op.node_name,
1282 all_nodes = self.cfg.GetNodeList()
1283 all_nodes.remove(self.op.node_name)
1284 return env, all_nodes, all_nodes
1286 def CheckPrereq(self):
1287 """Check prerequisites.
1290 - the node exists in the configuration
1291 - it does not have primary or secondary instances
1292 - it's not the master
1294 Any errors are signalled by raising errors.OpPrereqError.
1297 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1299 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1301 instance_list = self.cfg.GetInstanceList()
1303 masternode = self.sstore.GetMasterNode()
1304 if node.name == masternode:
1305 raise errors.OpPrereqError("Node is the master node,"
1306 " you need to failover first.")
1308 for instance_name in instance_list:
1309 instance = self.cfg.GetInstanceInfo(instance_name)
1310 if node.name == instance.primary_node:
1311 raise errors.OpPrereqError("Instance %s still running on the node,"
1312 " please remove first." % instance_name)
1313 if node.name in instance.secondary_nodes:
1314 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1315 " please remove first." % instance_name)
1316 self.op.node_name = node.name
1319 def Exec(self, feedback_fn):
1320 """Removes the node from the cluster.
1324 logger.Info("stopping the node daemon and removing configs from node %s" %
1327 rpc.call_node_leave_cluster(node.name)
1329 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1331 logger.Info("Removing node %s from config" % node.name)
1333 self.cfg.RemoveNode(node.name)
1335 _RemoveHostFromEtcHosts(node.name)
1338 class LUQueryNodes(NoHooksLU):
1339 """Logical unit for querying nodes.
1342 _OP_REQP = ["output_fields", "names"]
1344 def CheckPrereq(self):
1345 """Check prerequisites.
1347 This checks that the fields required are valid output fields.
1350 self.dynamic_fields = frozenset(["dtotal", "dfree",
1351 "mtotal", "mnode", "mfree",
1354 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1355 "pinst_list", "sinst_list",
1357 dynamic=self.dynamic_fields,
1358 selected=self.op.output_fields)
1360 self.wanted = _GetWantedNodes(self, self.op.names)
1362 def Exec(self, feedback_fn):
1363 """Computes the list of nodes and their attributes.
1366 nodenames = self.wanted
1367 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1369 # begin data gathering
1371 if self.dynamic_fields.intersection(self.op.output_fields):
1373 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1374 for name in nodenames:
1375 nodeinfo = node_data.get(name, None)
1378 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1379 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1380 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1381 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1382 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1383 "bootid": nodeinfo['bootid'],
1386 live_data[name] = {}
1388 live_data = dict.fromkeys(nodenames, {})
1390 node_to_primary = dict([(name, set()) for name in nodenames])
1391 node_to_secondary = dict([(name, set()) for name in nodenames])
1393 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1394 "sinst_cnt", "sinst_list"))
1395 if inst_fields & frozenset(self.op.output_fields):
1396 instancelist = self.cfg.GetInstanceList()
1398 for instance_name in instancelist:
1399 inst = self.cfg.GetInstanceInfo(instance_name)
1400 if inst.primary_node in node_to_primary:
1401 node_to_primary[inst.primary_node].add(inst.name)
1402 for secnode in inst.secondary_nodes:
1403 if secnode in node_to_secondary:
1404 node_to_secondary[secnode].add(inst.name)
1406 # end data gathering
1409 for node in nodelist:
1411 for field in self.op.output_fields:
1414 elif field == "pinst_list":
1415 val = list(node_to_primary[node.name])
1416 elif field == "sinst_list":
1417 val = list(node_to_secondary[node.name])
1418 elif field == "pinst_cnt":
1419 val = len(node_to_primary[node.name])
1420 elif field == "sinst_cnt":
1421 val = len(node_to_secondary[node.name])
1422 elif field == "pip":
1423 val = node.primary_ip
1424 elif field == "sip":
1425 val = node.secondary_ip
1426 elif field in self.dynamic_fields:
1427 val = live_data[node.name].get(field, None)
1429 raise errors.ParameterError(field)
1430 node_output.append(val)
1431 output.append(node_output)
1436 class LUQueryNodeVolumes(NoHooksLU):
1437 """Logical unit for getting volumes on node(s).
1440 _OP_REQP = ["nodes", "output_fields"]
1442 def CheckPrereq(self):
1443 """Check prerequisites.
1445 This checks that the fields required are valid output fields.
1448 self.nodes = _GetWantedNodes(self, self.op.nodes)
1450 _CheckOutputFields(static=["node"],
1451 dynamic=["phys", "vg", "name", "size", "instance"],
1452 selected=self.op.output_fields)
1455 def Exec(self, feedback_fn):
1456 """Computes the list of nodes and their attributes.
1459 nodenames = self.nodes
1460 volumes = rpc.call_node_volumes(nodenames)
1462 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1463 in self.cfg.GetInstanceList()]
1465 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1468 for node in nodenames:
1469 if node not in volumes or not volumes[node]:
1472 node_vols = volumes[node][:]
1473 node_vols.sort(key=lambda vol: vol['dev'])
1475 for vol in node_vols:
1477 for field in self.op.output_fields:
1480 elif field == "phys":
1484 elif field == "name":
1486 elif field == "size":
1487 val = int(float(vol['size']))
1488 elif field == "instance":
1490 if node not in lv_by_node[inst]:
1492 if vol['name'] in lv_by_node[inst][node]:
1498 raise errors.ParameterError(field)
1499 node_output.append(str(val))
1501 output.append(node_output)
1506 class LUAddNode(LogicalUnit):
1507 """Logical unit for adding node to the cluster.
1511 HTYPE = constants.HTYPE_NODE
1512 _OP_REQP = ["node_name"]
1514 def BuildHooksEnv(self):
1517 This will run on all nodes before, and on all nodes + the new node after.
1521 "OP_TARGET": self.op.node_name,
1522 "NODE_NAME": self.op.node_name,
1523 "NODE_PIP": self.op.primary_ip,
1524 "NODE_SIP": self.op.secondary_ip,
1526 nodes_0 = self.cfg.GetNodeList()
1527 nodes_1 = nodes_0 + [self.op.node_name, ]
1528 return env, nodes_0, nodes_1
1530 def CheckPrereq(self):
1531 """Check prerequisites.
1534 - the new node is not already in the config
1536 - its parameters (single/dual homed) matches the cluster
1538 Any errors are signalled by raising errors.OpPrereqError.
1541 node_name = self.op.node_name
1544 dns_data = utils.HostInfo(node_name)
1546 node = dns_data.name
1547 primary_ip = self.op.primary_ip = dns_data.ip
1548 secondary_ip = getattr(self.op, "secondary_ip", None)
1549 if secondary_ip is None:
1550 secondary_ip = primary_ip
1551 if not utils.IsValidIP(secondary_ip):
1552 raise errors.OpPrereqError("Invalid secondary IP given")
1553 self.op.secondary_ip = secondary_ip
1554 node_list = cfg.GetNodeList()
1555 if node in node_list:
1556 raise errors.OpPrereqError("Node %s is already in the configuration"
1559 for existing_node_name in node_list:
1560 existing_node = cfg.GetNodeInfo(existing_node_name)
1561 if (existing_node.primary_ip == primary_ip or
1562 existing_node.secondary_ip == primary_ip or
1563 existing_node.primary_ip == secondary_ip or
1564 existing_node.secondary_ip == secondary_ip):
1565 raise errors.OpPrereqError("New node ip address(es) conflict with"
1566 " existing node %s" % existing_node.name)
1568 # check that the type of the node (single versus dual homed) is the
1569 # same as for the master
1570 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1571 master_singlehomed = myself.secondary_ip == myself.primary_ip
1572 newbie_singlehomed = secondary_ip == primary_ip
1573 if master_singlehomed != newbie_singlehomed:
1574 if master_singlehomed:
1575 raise errors.OpPrereqError("The master has no private ip but the"
1576 " new node has one")
1578 raise errors.OpPrereqError("The master has a private ip but the"
1579 " new node doesn't have one")
1581 # checks reachablity
1582 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1583 raise errors.OpPrereqError("Node not reachable by ping")
1585 if not newbie_singlehomed:
1586 # check reachability from my secondary ip to newbie's secondary ip
1587 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1588 source=myself.secondary_ip):
1589 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1590 " based ping to noded port")
1592 self.new_node = objects.Node(name=node,
1593 primary_ip=primary_ip,
1594 secondary_ip=secondary_ip)
1596 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1597 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1598 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1599 constants.VNC_PASSWORD_FILE)
1601 def Exec(self, feedback_fn):
1602 """Adds the new node to the cluster.
1605 new_node = self.new_node
1606 node = new_node.name
1608 # set up inter-node password and certificate and restarts the node daemon
1609 gntpass = self.sstore.GetNodeDaemonPassword()
1610 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1611 raise errors.OpExecError("ganeti password corruption detected")
1612 f = open(constants.SSL_CERT_FILE)
1614 gntpem = f.read(8192)
1617 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1618 # so we use this to detect an invalid certificate; as long as the
1619 # cert doesn't contain this, the here-document will be correctly
1620 # parsed by the shell sequence below
1621 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1622 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1623 if not gntpem.endswith("\n"):
1624 raise errors.OpExecError("PEM must end with newline")
1625 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1627 # and then connect with ssh to set password and start ganeti-noded
1628 # note that all the below variables are sanitized at this point,
1629 # either by being constants or by the checks above
1631 mycommand = ("umask 077 && "
1632 "echo '%s' > '%s' && "
1633 "cat > '%s' << '!EOF.' && \n"
1634 "%s!EOF.\n%s restart" %
1635 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1636 constants.SSL_CERT_FILE, gntpem,
1637 constants.NODE_INITD_SCRIPT))
1639 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1641 raise errors.OpExecError("Remote command on node %s, error: %s,"
1643 (node, result.fail_reason, result.output))
1645 # check connectivity
1648 result = rpc.call_version([node])[node]
1650 if constants.PROTOCOL_VERSION == result:
1651 logger.Info("communication to node %s fine, sw version %s match" %
1654 raise errors.OpExecError("Version mismatch master version %s,"
1655 " node version %s" %
1656 (constants.PROTOCOL_VERSION, result))
1658 raise errors.OpExecError("Cannot get version from the new node")
1661 logger.Info("copy ssh key to node %s" % node)
1662 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1664 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1665 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1671 keyarray.append(f.read())
1675 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1676 keyarray[3], keyarray[4], keyarray[5])
1679 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1681 # Add node to our /etc/hosts, and add key to known_hosts
1682 _AddHostToEtcHosts(new_node.name)
1684 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1685 self.cfg.GetHostKey())
1687 if new_node.secondary_ip != new_node.primary_ip:
1688 if not rpc.call_node_tcp_ping(new_node.name,
1689 constants.LOCALHOST_IP_ADDRESS,
1690 new_node.secondary_ip,
1691 constants.DEFAULT_NODED_PORT,
1693 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1694 " you gave (%s). Please fix and re-run this"
1695 " command." % new_node.secondary_ip)
1697 success, msg = ssh.VerifyNodeHostname(node)
1699 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1700 " than the one the resolver gives: %s."
1701 " Please fix and re-run this command." %
1704 # Distribute updated /etc/hosts and known_hosts to all nodes,
1705 # including the node just added
1706 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1707 dist_nodes = self.cfg.GetNodeList() + [node]
1708 if myself.name in dist_nodes:
1709 dist_nodes.remove(myself.name)
1711 logger.Debug("Copying hosts and known_hosts to all nodes")
1712 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1713 result = rpc.call_upload_file(dist_nodes, fname)
1714 for to_node in dist_nodes:
1715 if not result[to_node]:
1716 logger.Error("copy of file %s to node %s failed" %
1719 to_copy = ss.GetFileList()
1720 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1721 to_copy.append(constants.VNC_PASSWORD_FILE)
1722 for fname in to_copy:
1723 if not ssh.CopyFileToNode(node, fname):
1724 logger.Error("could not copy file %s to node %s" % (fname, node))
1726 logger.Info("adding node %s to cluster.conf" % node)
1727 self.cfg.AddNode(new_node)
1730 class LUMasterFailover(LogicalUnit):
1731 """Failover the master node to the current node.
1733 This is a special LU in that it must run on a non-master node.
1736 HPATH = "master-failover"
1737 HTYPE = constants.HTYPE_CLUSTER
1741 def BuildHooksEnv(self):
1744 This will run on the new master only in the pre phase, and on all
1745 the nodes in the post phase.
1749 "OP_TARGET": self.new_master,
1750 "NEW_MASTER": self.new_master,
1751 "OLD_MASTER": self.old_master,
1753 return env, [self.new_master], self.cfg.GetNodeList()
1755 def CheckPrereq(self):
1756 """Check prerequisites.
1758 This checks that we are not already the master.
1761 self.new_master = utils.HostInfo().name
1762 self.old_master = self.sstore.GetMasterNode()
1764 if self.old_master == self.new_master:
1765 raise errors.OpPrereqError("This commands must be run on the node"
1766 " where you want the new master to be."
1767 " %s is already the master" %
1770 def Exec(self, feedback_fn):
1771 """Failover the master node.
1773 This command, when run on a non-master node, will cause the current
1774 master to cease being master, and the non-master to become new
1778 #TODO: do not rely on gethostname returning the FQDN
1779 logger.Info("setting master to %s, old master: %s" %
1780 (self.new_master, self.old_master))
1782 if not rpc.call_node_stop_master(self.old_master):
1783 logger.Error("could disable the master role on the old master"
1784 " %s, please disable manually" % self.old_master)
1787 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1788 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1789 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1790 logger.Error("could not distribute the new simple store master file"
1791 " to the other nodes, please check.")
1793 if not rpc.call_node_start_master(self.new_master):
1794 logger.Error("could not start the master role on the new master"
1795 " %s, please check" % self.new_master)
1796 feedback_fn("Error in activating the master IP on the new master,"
1797 " please fix manually.")
1801 class LUQueryClusterInfo(NoHooksLU):
1802 """Query cluster configuration.
1808 def CheckPrereq(self):
1809 """No prerequsites needed for this LU.
1814 def Exec(self, feedback_fn):
1815 """Return cluster config.
1819 "name": self.sstore.GetClusterName(),
1820 "software_version": constants.RELEASE_VERSION,
1821 "protocol_version": constants.PROTOCOL_VERSION,
1822 "config_version": constants.CONFIG_VERSION,
1823 "os_api_version": constants.OS_API_VERSION,
1824 "export_version": constants.EXPORT_VERSION,
1825 "master": self.sstore.GetMasterNode(),
1826 "architecture": (platform.architecture()[0], platform.machine()),
1832 class LUClusterCopyFile(NoHooksLU):
1833 """Copy file to cluster.
1836 _OP_REQP = ["nodes", "filename"]
1838 def CheckPrereq(self):
1839 """Check prerequisites.
1841 It should check that the named file exists and that the given list
1845 if not os.path.exists(self.op.filename):
1846 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1848 self.nodes = _GetWantedNodes(self, self.op.nodes)
1850 def Exec(self, feedback_fn):
1851 """Copy a file from master to some nodes.
1854 opts - class with options as members
1855 args - list containing a single element, the file name
1857 nodes - list containing the name of target nodes; if empty, all nodes
1860 filename = self.op.filename
1862 myname = utils.HostInfo().name
1864 for node in self.nodes:
1867 if not ssh.CopyFileToNode(node, filename):
1868 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1871 class LUDumpClusterConfig(NoHooksLU):
1872 """Return a text-representation of the cluster-config.
1877 def CheckPrereq(self):
1878 """No prerequisites.
1883 def Exec(self, feedback_fn):
1884 """Dump a representation of the cluster config to the standard output.
1887 return self.cfg.DumpConfig()
1890 class LURunClusterCommand(NoHooksLU):
1891 """Run a command on some nodes.
1894 _OP_REQP = ["command", "nodes"]
1896 def CheckPrereq(self):
1897 """Check prerequisites.
1899 It checks that the given list of nodes is valid.
1902 self.nodes = _GetWantedNodes(self, self.op.nodes)
1904 def Exec(self, feedback_fn):
1905 """Run a command on some nodes.
1908 # put the master at the end of the nodes list
1909 master_node = self.sstore.GetMasterNode()
1910 if master_node in self.nodes:
1911 self.nodes.remove(master_node)
1912 self.nodes.append(master_node)
1915 for node in self.nodes:
1916 result = ssh.SSHCall(node, "root", self.op.command)
1917 data.append((node, result.output, result.exit_code))
1922 class LUActivateInstanceDisks(NoHooksLU):
1923 """Bring up an instance's disks.
1926 _OP_REQP = ["instance_name"]
1928 def CheckPrereq(self):
1929 """Check prerequisites.
1931 This checks that the instance is in the cluster.
1934 instance = self.cfg.GetInstanceInfo(
1935 self.cfg.ExpandInstanceName(self.op.instance_name))
1936 if instance is None:
1937 raise errors.OpPrereqError("Instance '%s' not known" %
1938 self.op.instance_name)
1939 self.instance = instance
1942 def Exec(self, feedback_fn):
1943 """Activate the disks.
1946 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1948 raise errors.OpExecError("Cannot activate block devices")
1953 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1954 """Prepare the block devices for an instance.
1956 This sets up the block devices on all nodes.
1959 instance: a ganeti.objects.Instance object
1960 ignore_secondaries: if true, errors on secondary nodes won't result
1961 in an error return from the function
1964 false if the operation failed
1965 list of (host, instance_visible_name, node_visible_name) if the operation
1966 suceeded with the mapping from node devices to instance devices
1970 iname = instance.name
1971 # With the two passes mechanism we try to reduce the window of
1972 # opportunity for the race condition of switching DRBD to primary
1973 # before handshaking occured, but we do not eliminate it
1975 # The proper fix would be to wait (with some limits) until the
1976 # connection has been made and drbd transitions from WFConnection
1977 # into any other network-connected state (Connected, SyncTarget,
1980 # 1st pass, assemble on all nodes in secondary mode
1981 for inst_disk in instance.disks:
1982 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1983 cfg.SetDiskID(node_disk, node)
1984 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1986 logger.Error("could not prepare block device %s on node %s"
1987 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1988 if not ignore_secondaries:
1991 # FIXME: race condition on drbd migration to primary
1993 # 2nd pass, do only the primary node
1994 for inst_disk in instance.disks:
1995 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1996 if node != instance.primary_node:
1998 cfg.SetDiskID(node_disk, node)
1999 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2001 logger.Error("could not prepare block device %s on node %s"
2002 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2004 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2006 # leave the disks configured for the primary node
2007 # this is a workaround that would be fixed better by
2008 # improving the logical/physical id handling
2009 for disk in instance.disks:
2010 cfg.SetDiskID(disk, instance.primary_node)
2012 return disks_ok, device_info
2015 def _StartInstanceDisks(cfg, instance, force):
2016 """Start the disks of an instance.
2019 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2020 ignore_secondaries=force)
2022 _ShutdownInstanceDisks(instance, cfg)
2023 if force is not None and not force:
2024 logger.Error("If the message above refers to a secondary node,"
2025 " you can retry the operation using '--force'.")
2026 raise errors.OpExecError("Disk consistency error")
2029 class LUDeactivateInstanceDisks(NoHooksLU):
2030 """Shutdown an instance's disks.
2033 _OP_REQP = ["instance_name"]
2035 def CheckPrereq(self):
2036 """Check prerequisites.
2038 This checks that the instance is in the cluster.
2041 instance = self.cfg.GetInstanceInfo(
2042 self.cfg.ExpandInstanceName(self.op.instance_name))
2043 if instance is None:
2044 raise errors.OpPrereqError("Instance '%s' not known" %
2045 self.op.instance_name)
2046 self.instance = instance
2048 def Exec(self, feedback_fn):
2049 """Deactivate the disks
2052 instance = self.instance
2053 ins_l = rpc.call_instance_list([instance.primary_node])
2054 ins_l = ins_l[instance.primary_node]
2055 if not type(ins_l) is list:
2056 raise errors.OpExecError("Can't contact node '%s'" %
2057 instance.primary_node)
2059 if self.instance.name in ins_l:
2060 raise errors.OpExecError("Instance is running, can't shutdown"
2063 _ShutdownInstanceDisks(instance, self.cfg)
2066 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2067 """Shutdown block devices of an instance.
2069 This does the shutdown on all nodes of the instance.
2071 If the ignore_primary is false, errors on the primary node are
2076 for disk in instance.disks:
2077 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2078 cfg.SetDiskID(top_disk, node)
2079 if not rpc.call_blockdev_shutdown(node, top_disk):
2080 logger.Error("could not shutdown block device %s on node %s" %
2081 (disk.iv_name, node))
2082 if not ignore_primary or node != instance.primary_node:
2087 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2088 """Checks if a node has enough free memory.
2090 This function check if a given node has the needed amount of free
2091 memory. In case the node has less memory or we cannot get the
2092 information from the node, this function raise an OpPrereqError
2096 - cfg: a ConfigWriter instance
2097 - node: the node name
2098 - reason: string to use in the error message
2099 - requested: the amount of memory in MiB
2102 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2103 if not nodeinfo or not isinstance(nodeinfo, dict):
2104 raise errors.OpPrereqError("Could not contact node %s for resource"
2105 " information" % (node,))
2107 free_mem = nodeinfo[node].get('memory_free')
2108 if not isinstance(free_mem, int):
2109 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2110 " was '%s'" % (node, free_mem))
2111 if requested > free_mem:
2112 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2113 " needed %s MiB, available %s MiB" %
2114 (node, reason, requested, free_mem))
2117 class LUStartupInstance(LogicalUnit):
2118 """Starts an instance.
2121 HPATH = "instance-start"
2122 HTYPE = constants.HTYPE_INSTANCE
2123 _OP_REQP = ["instance_name", "force"]
2125 def BuildHooksEnv(self):
2128 This runs on master, primary and secondary nodes of the instance.
2132 "FORCE": self.op.force,
2134 env.update(_BuildInstanceHookEnvByObject(self.instance))
2135 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2136 list(self.instance.secondary_nodes))
2139 def CheckPrereq(self):
2140 """Check prerequisites.
2142 This checks that the instance is in the cluster.
2145 instance = self.cfg.GetInstanceInfo(
2146 self.cfg.ExpandInstanceName(self.op.instance_name))
2147 if instance is None:
2148 raise errors.OpPrereqError("Instance '%s' not known" %
2149 self.op.instance_name)
2151 # check bridges existance
2152 _CheckInstanceBridgesExist(instance)
2154 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2155 "starting instance %s" % instance.name,
2158 self.instance = instance
2159 self.op.instance_name = instance.name
2161 def Exec(self, feedback_fn):
2162 """Start the instance.
2165 instance = self.instance
2166 force = self.op.force
2167 extra_args = getattr(self.op, "extra_args", "")
2169 self.cfg.MarkInstanceUp(instance.name)
2171 node_current = instance.primary_node
2173 _StartInstanceDisks(self.cfg, instance, force)
2175 if not rpc.call_instance_start(node_current, instance, extra_args):
2176 _ShutdownInstanceDisks(instance, self.cfg)
2177 raise errors.OpExecError("Could not start instance")
2180 class LURebootInstance(LogicalUnit):
2181 """Reboot an instance.
2184 HPATH = "instance-reboot"
2185 HTYPE = constants.HTYPE_INSTANCE
2186 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2188 def BuildHooksEnv(self):
2191 This runs on master, primary and secondary nodes of the instance.
2195 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2197 env.update(_BuildInstanceHookEnvByObject(self.instance))
2198 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2199 list(self.instance.secondary_nodes))
2202 def CheckPrereq(self):
2203 """Check prerequisites.
2205 This checks that the instance is in the cluster.
2208 instance = self.cfg.GetInstanceInfo(
2209 self.cfg.ExpandInstanceName(self.op.instance_name))
2210 if instance is None:
2211 raise errors.OpPrereqError("Instance '%s' not known" %
2212 self.op.instance_name)
2214 # check bridges existance
2215 _CheckInstanceBridgesExist(instance)
2217 self.instance = instance
2218 self.op.instance_name = instance.name
2220 def Exec(self, feedback_fn):
2221 """Reboot the instance.
2224 instance = self.instance
2225 ignore_secondaries = self.op.ignore_secondaries
2226 reboot_type = self.op.reboot_type
2227 extra_args = getattr(self.op, "extra_args", "")
2229 node_current = instance.primary_node
2231 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2232 constants.INSTANCE_REBOOT_HARD,
2233 constants.INSTANCE_REBOOT_FULL]:
2234 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2235 (constants.INSTANCE_REBOOT_SOFT,
2236 constants.INSTANCE_REBOOT_HARD,
2237 constants.INSTANCE_REBOOT_FULL))
2239 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2240 constants.INSTANCE_REBOOT_HARD]:
2241 if not rpc.call_instance_reboot(node_current, instance,
2242 reboot_type, extra_args):
2243 raise errors.OpExecError("Could not reboot instance")
2245 if not rpc.call_instance_shutdown(node_current, instance):
2246 raise errors.OpExecError("could not shutdown instance for full reboot")
2247 _ShutdownInstanceDisks(instance, self.cfg)
2248 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2249 if not rpc.call_instance_start(node_current, instance, extra_args):
2250 _ShutdownInstanceDisks(instance, self.cfg)
2251 raise errors.OpExecError("Could not start instance for full reboot")
2253 self.cfg.MarkInstanceUp(instance.name)
2256 class LUShutdownInstance(LogicalUnit):
2257 """Shutdown an instance.
2260 HPATH = "instance-stop"
2261 HTYPE = constants.HTYPE_INSTANCE
2262 _OP_REQP = ["instance_name"]
2264 def BuildHooksEnv(self):
2267 This runs on master, primary and secondary nodes of the instance.
2270 env = _BuildInstanceHookEnvByObject(self.instance)
2271 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2272 list(self.instance.secondary_nodes))
2275 def CheckPrereq(self):
2276 """Check prerequisites.
2278 This checks that the instance is in the cluster.
2281 instance = self.cfg.GetInstanceInfo(
2282 self.cfg.ExpandInstanceName(self.op.instance_name))
2283 if instance is None:
2284 raise errors.OpPrereqError("Instance '%s' not known" %
2285 self.op.instance_name)
2286 self.instance = instance
2288 def Exec(self, feedback_fn):
2289 """Shutdown the instance.
2292 instance = self.instance
2293 node_current = instance.primary_node
2294 self.cfg.MarkInstanceDown(instance.name)
2295 if not rpc.call_instance_shutdown(node_current, instance):
2296 logger.Error("could not shutdown instance")
2298 _ShutdownInstanceDisks(instance, self.cfg)
2301 class LUReinstallInstance(LogicalUnit):
2302 """Reinstall an instance.
2305 HPATH = "instance-reinstall"
2306 HTYPE = constants.HTYPE_INSTANCE
2307 _OP_REQP = ["instance_name"]
2309 def BuildHooksEnv(self):
2312 This runs on master, primary and secondary nodes of the instance.
2315 env = _BuildInstanceHookEnvByObject(self.instance)
2316 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2317 list(self.instance.secondary_nodes))
2320 def CheckPrereq(self):
2321 """Check prerequisites.
2323 This checks that the instance is in the cluster and is not running.
2326 instance = self.cfg.GetInstanceInfo(
2327 self.cfg.ExpandInstanceName(self.op.instance_name))
2328 if instance is None:
2329 raise errors.OpPrereqError("Instance '%s' not known" %
2330 self.op.instance_name)
2331 if instance.disk_template == constants.DT_DISKLESS:
2332 raise errors.OpPrereqError("Instance '%s' has no disks" %
2333 self.op.instance_name)
2334 if instance.status != "down":
2335 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2336 self.op.instance_name)
2337 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2339 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2340 (self.op.instance_name,
2341 instance.primary_node))
2343 self.op.os_type = getattr(self.op, "os_type", None)
2344 if self.op.os_type is not None:
2346 pnode = self.cfg.GetNodeInfo(
2347 self.cfg.ExpandNodeName(instance.primary_node))
2349 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2351 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2353 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2354 " primary node" % self.op.os_type)
2356 self.instance = instance
2358 def Exec(self, feedback_fn):
2359 """Reinstall the instance.
2362 inst = self.instance
2364 if self.op.os_type is not None:
2365 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2366 inst.os = self.op.os_type
2367 self.cfg.AddInstance(inst)
2369 _StartInstanceDisks(self.cfg, inst, None)
2371 feedback_fn("Running the instance OS create scripts...")
2372 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2373 raise errors.OpExecError("Could not install OS for instance %s"
2375 (inst.name, inst.primary_node))
2377 _ShutdownInstanceDisks(inst, self.cfg)
2380 class LURenameInstance(LogicalUnit):
2381 """Rename an instance.
2384 HPATH = "instance-rename"
2385 HTYPE = constants.HTYPE_INSTANCE
2386 _OP_REQP = ["instance_name", "new_name"]
2388 def BuildHooksEnv(self):
2391 This runs on master, primary and secondary nodes of the instance.
2394 env = _BuildInstanceHookEnvByObject(self.instance)
2395 env["INSTANCE_NEW_NAME"] = self.op.new_name
2396 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2397 list(self.instance.secondary_nodes))
2400 def CheckPrereq(self):
2401 """Check prerequisites.
2403 This checks that the instance is in the cluster and is not running.
2406 instance = self.cfg.GetInstanceInfo(
2407 self.cfg.ExpandInstanceName(self.op.instance_name))
2408 if instance is None:
2409 raise errors.OpPrereqError("Instance '%s' not known" %
2410 self.op.instance_name)
2411 if instance.status != "down":
2412 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2413 self.op.instance_name)
2414 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2416 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2417 (self.op.instance_name,
2418 instance.primary_node))
2419 self.instance = instance
2421 # new name verification
2422 name_info = utils.HostInfo(self.op.new_name)
2424 self.op.new_name = new_name = name_info.name
2425 instance_list = self.cfg.GetInstanceList()
2426 if new_name in instance_list:
2427 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2430 if not getattr(self.op, "ignore_ip", False):
2431 command = ["fping", "-q", name_info.ip]
2432 result = utils.RunCmd(command)
2433 if not result.failed:
2434 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2435 (name_info.ip, new_name))
2438 def Exec(self, feedback_fn):
2439 """Reinstall the instance.
2442 inst = self.instance
2443 old_name = inst.name
2445 self.cfg.RenameInstance(inst.name, self.op.new_name)
2447 # re-read the instance from the configuration after rename
2448 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2450 _StartInstanceDisks(self.cfg, inst, None)
2452 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2454 msg = ("Could run OS rename script for instance %s on node %s (but the"
2455 " instance has been renamed in Ganeti)" %
2456 (inst.name, inst.primary_node))
2459 _ShutdownInstanceDisks(inst, self.cfg)
2462 class LURemoveInstance(LogicalUnit):
2463 """Remove an instance.
2466 HPATH = "instance-remove"
2467 HTYPE = constants.HTYPE_INSTANCE
2468 _OP_REQP = ["instance_name"]
2470 def BuildHooksEnv(self):
2473 This runs on master, primary and secondary nodes of the instance.
2476 env = _BuildInstanceHookEnvByObject(self.instance)
2477 nl = [self.sstore.GetMasterNode()]
2480 def CheckPrereq(self):
2481 """Check prerequisites.
2483 This checks that the instance is in the cluster.
2486 instance = self.cfg.GetInstanceInfo(
2487 self.cfg.ExpandInstanceName(self.op.instance_name))
2488 if instance is None:
2489 raise errors.OpPrereqError("Instance '%s' not known" %
2490 self.op.instance_name)
2491 self.instance = instance
2493 def Exec(self, feedback_fn):
2494 """Remove the instance.
2497 instance = self.instance
2498 logger.Info("shutting down instance %s on node %s" %
2499 (instance.name, instance.primary_node))
2501 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2502 if self.op.ignore_failures:
2503 feedback_fn("Warning: can't shutdown instance")
2505 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2506 (instance.name, instance.primary_node))
2508 logger.Info("removing block devices for instance %s" % instance.name)
2510 if not _RemoveDisks(instance, self.cfg):
2511 if self.op.ignore_failures:
2512 feedback_fn("Warning: can't remove instance's disks")
2514 raise errors.OpExecError("Can't remove instance's disks")
2516 logger.Info("removing instance %s out of cluster config" % instance.name)
2518 self.cfg.RemoveInstance(instance.name)
2521 class LUQueryInstances(NoHooksLU):
2522 """Logical unit for querying instances.
2525 _OP_REQP = ["output_fields", "names"]
2527 def CheckPrereq(self):
2528 """Check prerequisites.
2530 This checks that the fields required are valid output fields.
2533 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2534 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2535 "admin_state", "admin_ram",
2536 "disk_template", "ip", "mac", "bridge",
2537 "sda_size", "sdb_size", "vcpus"],
2538 dynamic=self.dynamic_fields,
2539 selected=self.op.output_fields)
2541 self.wanted = _GetWantedInstances(self, self.op.names)
2543 def Exec(self, feedback_fn):
2544 """Computes the list of nodes and their attributes.
2547 instance_names = self.wanted
2548 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2551 # begin data gathering
2553 nodes = frozenset([inst.primary_node for inst in instance_list])
2556 if self.dynamic_fields.intersection(self.op.output_fields):
2558 node_data = rpc.call_all_instances_info(nodes)
2560 result = node_data[name]
2562 live_data.update(result)
2563 elif result == False:
2564 bad_nodes.append(name)
2565 # else no instance is alive
2567 live_data = dict([(name, {}) for name in instance_names])
2569 # end data gathering
2572 for instance in instance_list:
2574 for field in self.op.output_fields:
2579 elif field == "pnode":
2580 val = instance.primary_node
2581 elif field == "snodes":
2582 val = list(instance.secondary_nodes)
2583 elif field == "admin_state":
2584 val = (instance.status != "down")
2585 elif field == "oper_state":
2586 if instance.primary_node in bad_nodes:
2589 val = bool(live_data.get(instance.name))
2590 elif field == "status":
2591 if instance.primary_node in bad_nodes:
2592 val = "ERROR_nodedown"
2594 running = bool(live_data.get(instance.name))
2596 if instance.status != "down":
2601 if instance.status != "down":
2605 elif field == "admin_ram":
2606 val = instance.memory
2607 elif field == "oper_ram":
2608 if instance.primary_node in bad_nodes:
2610 elif instance.name in live_data:
2611 val = live_data[instance.name].get("memory", "?")
2614 elif field == "disk_template":
2615 val = instance.disk_template
2617 val = instance.nics[0].ip
2618 elif field == "bridge":
2619 val = instance.nics[0].bridge
2620 elif field == "mac":
2621 val = instance.nics[0].mac
2622 elif field == "sda_size" or field == "sdb_size":
2623 disk = instance.FindDisk(field[:3])
2628 elif field == "vcpus":
2629 val = instance.vcpus
2631 raise errors.ParameterError(field)
2638 class LUFailoverInstance(LogicalUnit):
2639 """Failover an instance.
2642 HPATH = "instance-failover"
2643 HTYPE = constants.HTYPE_INSTANCE
2644 _OP_REQP = ["instance_name", "ignore_consistency"]
2646 def BuildHooksEnv(self):
2649 This runs on master, primary and secondary nodes of the instance.
2653 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2655 env.update(_BuildInstanceHookEnvByObject(self.instance))
2656 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2659 def CheckPrereq(self):
2660 """Check prerequisites.
2662 This checks that the instance is in the cluster.
2665 instance = self.cfg.GetInstanceInfo(
2666 self.cfg.ExpandInstanceName(self.op.instance_name))
2667 if instance is None:
2668 raise errors.OpPrereqError("Instance '%s' not known" %
2669 self.op.instance_name)
2671 if instance.disk_template not in constants.DTS_NET_MIRROR:
2672 raise errors.OpPrereqError("Instance's disk layout is not"
2673 " network mirrored, cannot failover.")
2675 secondary_nodes = instance.secondary_nodes
2676 if not secondary_nodes:
2677 raise errors.ProgrammerError("no secondary node but using "
2678 "DT_REMOTE_RAID1 template")
2680 target_node = secondary_nodes[0]
2681 # check memory requirements on the secondary node
2682 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2683 instance.name, instance.memory)
2685 # check bridge existance
2686 brlist = [nic.bridge for nic in instance.nics]
2687 if not rpc.call_bridges_exist(target_node, brlist):
2688 raise errors.OpPrereqError("One or more target bridges %s does not"
2689 " exist on destination node '%s'" %
2690 (brlist, target_node))
2692 self.instance = instance
2694 def Exec(self, feedback_fn):
2695 """Failover an instance.
2697 The failover is done by shutting it down on its present node and
2698 starting it on the secondary.
2701 instance = self.instance
2703 source_node = instance.primary_node
2704 target_node = instance.secondary_nodes[0]
2706 feedback_fn("* checking disk consistency between source and target")
2707 for dev in instance.disks:
2708 # for remote_raid1, these are md over drbd
2709 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2710 if instance.status == "up" and not self.op.ignore_consistency:
2711 raise errors.OpExecError("Disk %s is degraded on target node,"
2712 " aborting failover." % dev.iv_name)
2714 feedback_fn("* shutting down instance on source node")
2715 logger.Info("Shutting down instance %s on node %s" %
2716 (instance.name, source_node))
2718 if not rpc.call_instance_shutdown(source_node, instance):
2719 if self.op.ignore_consistency:
2720 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2721 " anyway. Please make sure node %s is down" %
2722 (instance.name, source_node, source_node))
2724 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2725 (instance.name, source_node))
2727 feedback_fn("* deactivating the instance's disks on source node")
2728 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2729 raise errors.OpExecError("Can't shut down the instance's disks.")
2731 instance.primary_node = target_node
2732 # distribute new instance config to the other nodes
2733 self.cfg.AddInstance(instance)
2735 # Only start the instance if it's marked as up
2736 if instance.status == "up":
2737 feedback_fn("* activating the instance's disks on target node")
2738 logger.Info("Starting instance %s on node %s" %
2739 (instance.name, target_node))
2741 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2742 ignore_secondaries=True)
2744 _ShutdownInstanceDisks(instance, self.cfg)
2745 raise errors.OpExecError("Can't activate the instance's disks")
2747 feedback_fn("* starting the instance on the target node")
2748 if not rpc.call_instance_start(target_node, instance, None):
2749 _ShutdownInstanceDisks(instance, self.cfg)
2750 raise errors.OpExecError("Could not start instance %s on node %s." %
2751 (instance.name, target_node))
2754 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2755 """Create a tree of block devices on the primary node.
2757 This always creates all devices.
2761 for child in device.children:
2762 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2765 cfg.SetDiskID(device, node)
2766 new_id = rpc.call_blockdev_create(node, device, device.size,
2767 instance.name, True, info)
2770 if device.physical_id is None:
2771 device.physical_id = new_id
2775 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2776 """Create a tree of block devices on a secondary node.
2778 If this device type has to be created on secondaries, create it and
2781 If not, just recurse to children keeping the same 'force' value.
2784 if device.CreateOnSecondary():
2787 for child in device.children:
2788 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2789 child, force, info):
2794 cfg.SetDiskID(device, node)
2795 new_id = rpc.call_blockdev_create(node, device, device.size,
2796 instance.name, False, info)
2799 if device.physical_id is None:
2800 device.physical_id = new_id
2804 def _GenerateUniqueNames(cfg, exts):
2805 """Generate a suitable LV name.
2807 This will generate a logical volume name for the given instance.
2812 new_id = cfg.GenerateUniqueID()
2813 results.append("%s%s" % (new_id, val))
2817 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2818 """Generate a drbd device complete with its children.
2821 port = cfg.AllocatePort()
2822 vgname = cfg.GetVGName()
2823 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2824 logical_id=(vgname, names[0]))
2825 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2826 logical_id=(vgname, names[1]))
2827 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2828 logical_id = (primary, secondary, port),
2829 children = [dev_data, dev_meta])
2833 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2834 """Generate a drbd8 device complete with its children.
2837 port = cfg.AllocatePort()
2838 vgname = cfg.GetVGName()
2839 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2840 logical_id=(vgname, names[0]))
2841 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2842 logical_id=(vgname, names[1]))
2843 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2844 logical_id = (primary, secondary, port),
2845 children = [dev_data, dev_meta],
2849 def _GenerateDiskTemplate(cfg, template_name,
2850 instance_name, primary_node,
2851 secondary_nodes, disk_sz, swap_sz):
2852 """Generate the entire disk layout for a given template type.
2855 #TODO: compute space requirements
2857 vgname = cfg.GetVGName()
2858 if template_name == constants.DT_DISKLESS:
2860 elif template_name == constants.DT_PLAIN:
2861 if len(secondary_nodes) != 0:
2862 raise errors.ProgrammerError("Wrong template configuration")
2864 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2865 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2866 logical_id=(vgname, names[0]),
2868 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2869 logical_id=(vgname, names[1]),
2871 disks = [sda_dev, sdb_dev]
2872 elif template_name == constants.DT_LOCAL_RAID1:
2873 if len(secondary_nodes) != 0:
2874 raise errors.ProgrammerError("Wrong template configuration")
2877 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2878 ".sdb_m1", ".sdb_m2"])
2879 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2880 logical_id=(vgname, names[0]))
2881 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2882 logical_id=(vgname, names[1]))
2883 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2885 children = [sda_dev_m1, sda_dev_m2])
2886 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2887 logical_id=(vgname, names[2]))
2888 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2889 logical_id=(vgname, names[3]))
2890 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2892 children = [sdb_dev_m1, sdb_dev_m2])
2893 disks = [md_sda_dev, md_sdb_dev]
2894 elif template_name == constants.DT_REMOTE_RAID1:
2895 if len(secondary_nodes) != 1:
2896 raise errors.ProgrammerError("Wrong template configuration")
2897 remote_node = secondary_nodes[0]
2898 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2899 ".sdb_data", ".sdb_meta"])
2900 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2901 disk_sz, names[0:2])
2902 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2903 children = [drbd_sda_dev], size=disk_sz)
2904 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2905 swap_sz, names[2:4])
2906 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2907 children = [drbd_sdb_dev], size=swap_sz)
2908 disks = [md_sda_dev, md_sdb_dev]
2909 elif template_name == constants.DT_DRBD8:
2910 if len(secondary_nodes) != 1:
2911 raise errors.ProgrammerError("Wrong template configuration")
2912 remote_node = secondary_nodes[0]
2913 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2914 ".sdb_data", ".sdb_meta"])
2915 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2916 disk_sz, names[0:2], "sda")
2917 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2918 swap_sz, names[2:4], "sdb")
2919 disks = [drbd_sda_dev, drbd_sdb_dev]
2921 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2925 def _GetInstanceInfoText(instance):
2926 """Compute that text that should be added to the disk's metadata.
2929 return "originstname+%s" % instance.name
2932 def _CreateDisks(cfg, instance):
2933 """Create all disks for an instance.
2935 This abstracts away some work from AddInstance.
2938 instance: the instance object
2941 True or False showing the success of the creation process
2944 info = _GetInstanceInfoText(instance)
2946 for device in instance.disks:
2947 logger.Info("creating volume %s for instance %s" %
2948 (device.iv_name, instance.name))
2950 for secondary_node in instance.secondary_nodes:
2951 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2952 device, False, info):
2953 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2954 (device.iv_name, device, secondary_node))
2957 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2958 instance, device, info):
2959 logger.Error("failed to create volume %s on primary!" %
2965 def _RemoveDisks(instance, cfg):
2966 """Remove all disks for an instance.
2968 This abstracts away some work from `AddInstance()` and
2969 `RemoveInstance()`. Note that in case some of the devices couldn't
2970 be removed, the removal will continue with the other ones (compare
2971 with `_CreateDisks()`).
2974 instance: the instance object
2977 True or False showing the success of the removal proces
2980 logger.Info("removing block devices for instance %s" % instance.name)
2983 for device in instance.disks:
2984 for node, disk in device.ComputeNodeTree(instance.primary_node):
2985 cfg.SetDiskID(disk, node)
2986 if not rpc.call_blockdev_remove(node, disk):
2987 logger.Error("could not remove block device %s on node %s,"
2988 " continuing anyway" %
2989 (device.iv_name, node))
2994 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2995 """Compute disk size requirements in the volume group
2997 This is currently hard-coded for the two-drive layout.
3000 # Required free disk space as a function of disk and swap space
3002 constants.DT_DISKLESS: None,
3003 constants.DT_PLAIN: disk_size + swap_size,
3004 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3005 # 256 MB are added for drbd metadata, 128MB for each drbd device
3006 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3007 constants.DT_DRBD8: disk_size + swap_size + 256,
3010 if disk_template not in req_size_dict:
3011 raise errors.ProgrammerError("Disk template '%s' size requirement"
3012 " is unknown" % disk_template)
3014 return req_size_dict[disk_template]
3017 class LUCreateInstance(LogicalUnit):
3018 """Create an instance.
3021 HPATH = "instance-add"
3022 HTYPE = constants.HTYPE_INSTANCE
3023 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3024 "disk_template", "swap_size", "mode", "start", "vcpus",
3025 "wait_for_sync", "ip_check", "mac"]
3027 def _RunAllocator(self):
3028 """Run the allocator based on input opcode.
3031 al_data = _IAllocatorGetClusterData(self.cfg, self.sstore)
3032 disks = [{"size": self.op.disk_size, "mode": "w"},
3033 {"size": self.op.swap_size, "mode": "w"}]
3034 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3035 "bridge": self.op.bridge}]
3036 op = opcodes.OpTestAllocator(name=self.op.instance_name,
3037 disk_template=self.op.disk_template,
3040 vcpus=self.op.vcpus,
3041 mem_size=self.op.mem_size,
3045 _IAllocatorAddNewInstance(al_data, op)
3047 text = serializer.Dump(al_data)
3049 result = _IAllocatorRun(self.op.iallocator, text)
3051 result = _IAllocatorValidateResult(result)
3053 if not result["success"]:
3054 raise errors.OpPrereqError("Can't compute nodes using"
3055 " iallocator '%s': %s" % (self.op.iallocator,
3058 if self.op.disk_template in constants.DTS_NET_MIRROR:
3061 if len(result["nodes"]) != req_nodes:
3062 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3063 " of nodes (%s), required %s" %
3064 (len(result["nodes"]), req_nodes))
3065 self.op.pnode = result["nodes"][0]
3066 logger.ToStdout("Selected nodes for the instance: %s" %
3067 (", ".join(result["nodes"]),))
3068 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3069 (self.op.instance_name, self.op.iallocator, result["nodes"]))
3071 self.op.snode = result["nodes"][1]
3073 def BuildHooksEnv(self):
3076 This runs on master, primary and secondary nodes of the instance.
3080 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3081 "INSTANCE_DISK_SIZE": self.op.disk_size,
3082 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3083 "INSTANCE_ADD_MODE": self.op.mode,
3085 if self.op.mode == constants.INSTANCE_IMPORT:
3086 env["INSTANCE_SRC_NODE"] = self.op.src_node
3087 env["INSTANCE_SRC_PATH"] = self.op.src_path
3088 env["INSTANCE_SRC_IMAGE"] = self.src_image
3090 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3091 primary_node=self.op.pnode,
3092 secondary_nodes=self.secondaries,
3093 status=self.instance_status,
3094 os_type=self.op.os_type,
3095 memory=self.op.mem_size,
3096 vcpus=self.op.vcpus,
3097 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3100 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3105 def CheckPrereq(self):
3106 """Check prerequisites.
3109 # set optional parameters to none if they don't exist
3110 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3112 if not hasattr(self.op, attr):
3113 setattr(self.op, attr, None)
3115 if self.op.mode not in (constants.INSTANCE_CREATE,
3116 constants.INSTANCE_IMPORT):
3117 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3120 if self.op.mode == constants.INSTANCE_IMPORT:
3121 src_node = getattr(self.op, "src_node", None)
3122 src_path = getattr(self.op, "src_path", None)
3123 if src_node is None or src_path is None:
3124 raise errors.OpPrereqError("Importing an instance requires source"
3125 " node and path options")
3126 src_node_full = self.cfg.ExpandNodeName(src_node)
3127 if src_node_full is None:
3128 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3129 self.op.src_node = src_node = src_node_full
3131 if not os.path.isabs(src_path):
3132 raise errors.OpPrereqError("The source path must be absolute")
3134 export_info = rpc.call_export_info(src_node, src_path)
3137 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3139 if not export_info.has_section(constants.INISECT_EXP):
3140 raise errors.ProgrammerError("Corrupted export config")
3142 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3143 if (int(ei_version) != constants.EXPORT_VERSION):
3144 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3145 (ei_version, constants.EXPORT_VERSION))
3147 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3148 raise errors.OpPrereqError("Can't import instance with more than"
3151 # FIXME: are the old os-es, disk sizes, etc. useful?
3152 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3153 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3155 self.src_image = diskimage
3156 else: # INSTANCE_CREATE
3157 if getattr(self.op, "os_type", None) is None:
3158 raise errors.OpPrereqError("No guest OS specified")
3160 #### instance parameters check
3162 # disk template and mirror node verification
3163 if self.op.disk_template not in constants.DISK_TEMPLATES:
3164 raise errors.OpPrereqError("Invalid disk template name")
3166 # instance name verification
3167 hostname1 = utils.HostInfo(self.op.instance_name)
3169 self.op.instance_name = instance_name = hostname1.name
3170 instance_list = self.cfg.GetInstanceList()
3171 if instance_name in instance_list:
3172 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3175 # ip validity checks
3176 ip = getattr(self.op, "ip", None)
3177 if ip is None or ip.lower() == "none":
3179 elif ip.lower() == "auto":
3180 inst_ip = hostname1.ip
3182 if not utils.IsValidIP(ip):
3183 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3184 " like a valid IP" % ip)
3186 self.inst_ip = self.op.ip = inst_ip
3188 if self.op.start and not self.op.ip_check:
3189 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3190 " adding an instance in start mode")
3192 if self.op.ip_check:
3193 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3194 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3195 (hostname1.ip, instance_name))
3197 # MAC address verification
3198 if self.op.mac != "auto":
3199 if not utils.IsValidMac(self.op.mac.lower()):
3200 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3203 # bridge verification
3204 bridge = getattr(self.op, "bridge", None)
3206 self.op.bridge = self.cfg.GetDefBridge()
3208 self.op.bridge = bridge
3210 # boot order verification
3211 if self.op.hvm_boot_order is not None:
3212 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3213 raise errors.OpPrereqError("invalid boot order specified,"
3214 " must be one or more of [acdn]")
3217 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3218 raise errors.OpPrereqError("One and only one of iallocator and primary"
3219 " node must be given")
3221 if self.op.iallocator is not None:
3222 self._RunAllocator()
3224 #### node related checks
3226 # check primary node
3227 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3229 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3231 self.op.pnode = pnode.name
3233 self.secondaries = []
3235 # mirror node verification
3236 if self.op.disk_template in constants.DTS_NET_MIRROR:
3237 if getattr(self.op, "snode", None) is None:
3238 raise errors.OpPrereqError("The networked disk templates need"
3241 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3242 if snode_name is None:
3243 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3245 elif snode_name == pnode.name:
3246 raise errors.OpPrereqError("The secondary node cannot be"
3247 " the primary node.")
3248 self.secondaries.append(snode_name)
3250 req_size = _ComputeDiskSize(self.op.disk_template,
3251 self.op.disk_size, self.op.swap_size)
3253 # Check lv size requirements
3254 if req_size is not None:
3255 nodenames = [pnode.name] + self.secondaries
3256 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3257 for node in nodenames:
3258 info = nodeinfo.get(node, None)
3260 raise errors.OpPrereqError("Cannot get current information"
3261 " from node '%s'" % nodeinfo)
3262 vg_free = info.get('vg_free', None)
3263 if not isinstance(vg_free, int):
3264 raise errors.OpPrereqError("Can't compute free disk space on"
3266 if req_size > info['vg_free']:
3267 raise errors.OpPrereqError("Not enough disk space on target node %s."
3268 " %d MB available, %d MB required" %
3269 (node, info['vg_free'], req_size))
3272 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3274 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3275 " primary node" % self.op.os_type)
3277 if self.op.kernel_path == constants.VALUE_NONE:
3278 raise errors.OpPrereqError("Can't set instance kernel to none")
3281 # bridge check on primary node
3282 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3283 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3284 " destination node '%s'" %
3285 (self.op.bridge, pnode.name))
3288 self.instance_status = 'up'
3290 self.instance_status = 'down'
3292 def Exec(self, feedback_fn):
3293 """Create and add the instance to the cluster.
3296 instance = self.op.instance_name
3297 pnode_name = self.pnode.name
3299 if self.op.mac == "auto":
3300 mac_address = self.cfg.GenerateMAC()
3302 mac_address = self.op.mac
3304 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3305 if self.inst_ip is not None:
3306 nic.ip = self.inst_ip
3308 ht_kind = self.sstore.GetHypervisorType()
3309 if ht_kind in constants.HTS_REQ_PORT:
3310 network_port = self.cfg.AllocatePort()
3314 disks = _GenerateDiskTemplate(self.cfg,
3315 self.op.disk_template,
3316 instance, pnode_name,
3317 self.secondaries, self.op.disk_size,
3320 iobj = objects.Instance(name=instance, os=self.op.os_type,
3321 primary_node=pnode_name,
3322 memory=self.op.mem_size,
3323 vcpus=self.op.vcpus,
3324 nics=[nic], disks=disks,
3325 disk_template=self.op.disk_template,
3326 status=self.instance_status,
3327 network_port=network_port,
3328 kernel_path=self.op.kernel_path,
3329 initrd_path=self.op.initrd_path,
3330 hvm_boot_order=self.op.hvm_boot_order,
3333 feedback_fn("* creating instance disks...")
3334 if not _CreateDisks(self.cfg, iobj):
3335 _RemoveDisks(iobj, self.cfg)
3336 raise errors.OpExecError("Device creation failed, reverting...")
3338 feedback_fn("adding instance %s to cluster config" % instance)
3340 self.cfg.AddInstance(iobj)
3342 if self.op.wait_for_sync:
3343 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3344 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3345 # make sure the disks are not degraded (still sync-ing is ok)
3347 feedback_fn("* checking mirrors status")
3348 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3353 _RemoveDisks(iobj, self.cfg)
3354 self.cfg.RemoveInstance(iobj.name)
3355 raise errors.OpExecError("There are some degraded disks for"
3358 feedback_fn("creating os for instance %s on node %s" %
3359 (instance, pnode_name))
3361 if iobj.disk_template != constants.DT_DISKLESS:
3362 if self.op.mode == constants.INSTANCE_CREATE:
3363 feedback_fn("* running the instance OS create scripts...")
3364 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3365 raise errors.OpExecError("could not add os for instance %s"
3367 (instance, pnode_name))
3369 elif self.op.mode == constants.INSTANCE_IMPORT:
3370 feedback_fn("* running the instance OS import scripts...")
3371 src_node = self.op.src_node
3372 src_image = self.src_image
3373 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3374 src_node, src_image):
3375 raise errors.OpExecError("Could not import os for instance"
3377 (instance, pnode_name))
3379 # also checked in the prereq part
3380 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3384 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3385 feedback_fn("* starting instance...")
3386 if not rpc.call_instance_start(pnode_name, iobj, None):
3387 raise errors.OpExecError("Could not start instance")
3390 class LUConnectConsole(NoHooksLU):
3391 """Connect to an instance's console.
3393 This is somewhat special in that it returns the command line that
3394 you need to run on the master node in order to connect to the
3398 _OP_REQP = ["instance_name"]
3400 def CheckPrereq(self):
3401 """Check prerequisites.
3403 This checks that the instance is in the cluster.
3406 instance = self.cfg.GetInstanceInfo(
3407 self.cfg.ExpandInstanceName(self.op.instance_name))
3408 if instance is None:
3409 raise errors.OpPrereqError("Instance '%s' not known" %
3410 self.op.instance_name)
3411 self.instance = instance
3413 def Exec(self, feedback_fn):
3414 """Connect to the console of an instance
3417 instance = self.instance
3418 node = instance.primary_node
3420 node_insts = rpc.call_instance_list([node])[node]
3421 if node_insts is False:
3422 raise errors.OpExecError("Can't connect to node %s." % node)
3424 if instance.name not in node_insts:
3425 raise errors.OpExecError("Instance %s is not running." % instance.name)
3427 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3429 hyper = hypervisor.GetHypervisor()
3430 console_cmd = hyper.GetShellCommandForConsole(instance)
3432 argv = ["ssh", "-q", "-t"]
3433 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3434 argv.extend(ssh.BATCH_MODE_OPTS)
3436 argv.append(console_cmd)
3440 class LUAddMDDRBDComponent(LogicalUnit):
3441 """Adda new mirror member to an instance's disk.
3444 HPATH = "mirror-add"
3445 HTYPE = constants.HTYPE_INSTANCE
3446 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3448 def BuildHooksEnv(self):
3451 This runs on the master, the primary and all the secondaries.
3455 "NEW_SECONDARY": self.op.remote_node,
3456 "DISK_NAME": self.op.disk_name,
3458 env.update(_BuildInstanceHookEnvByObject(self.instance))
3459 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3460 self.op.remote_node,] + list(self.instance.secondary_nodes)
3463 def CheckPrereq(self):
3464 """Check prerequisites.
3466 This checks that the instance is in the cluster.
3469 instance = self.cfg.GetInstanceInfo(
3470 self.cfg.ExpandInstanceName(self.op.instance_name))
3471 if instance is None:
3472 raise errors.OpPrereqError("Instance '%s' not known" %
3473 self.op.instance_name)
3474 self.instance = instance
3476 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3477 if remote_node is None:
3478 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3479 self.remote_node = remote_node
3481 if remote_node == instance.primary_node:
3482 raise errors.OpPrereqError("The specified node is the primary node of"
3485 if instance.disk_template != constants.DT_REMOTE_RAID1:
3486 raise errors.OpPrereqError("Instance's disk layout is not"
3488 for disk in instance.disks:
3489 if disk.iv_name == self.op.disk_name:
3492 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3493 " instance." % self.op.disk_name)
3494 if len(disk.children) > 1:
3495 raise errors.OpPrereqError("The device already has two slave devices."
3496 " This would create a 3-disk raid1 which we"
3500 def Exec(self, feedback_fn):
3501 """Add the mirror component
3505 instance = self.instance
3507 remote_node = self.remote_node
3508 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3509 names = _GenerateUniqueNames(self.cfg, lv_names)
3510 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3511 remote_node, disk.size, names)
3513 logger.Info("adding new mirror component on secondary")
3515 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3517 _GetInstanceInfoText(instance)):
3518 raise errors.OpExecError("Failed to create new component on secondary"
3519 " node %s" % remote_node)
3521 logger.Info("adding new mirror component on primary")
3523 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3525 _GetInstanceInfoText(instance)):
3526 # remove secondary dev
3527 self.cfg.SetDiskID(new_drbd, remote_node)
3528 rpc.call_blockdev_remove(remote_node, new_drbd)
3529 raise errors.OpExecError("Failed to create volume on primary")
3531 # the device exists now
3532 # call the primary node to add the mirror to md
3533 logger.Info("adding new mirror component to md")
3534 if not rpc.call_blockdev_addchildren(instance.primary_node,
3536 logger.Error("Can't add mirror compoment to md!")
3537 self.cfg.SetDiskID(new_drbd, remote_node)
3538 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3539 logger.Error("Can't rollback on secondary")
3540 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3541 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3542 logger.Error("Can't rollback on primary")
3543 raise errors.OpExecError("Can't add mirror component to md array")
3545 disk.children.append(new_drbd)
3547 self.cfg.AddInstance(instance)
3549 _WaitForSync(self.cfg, instance, self.proc)
3554 class LURemoveMDDRBDComponent(LogicalUnit):
3555 """Remove a component from a remote_raid1 disk.
3558 HPATH = "mirror-remove"
3559 HTYPE = constants.HTYPE_INSTANCE
3560 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3562 def BuildHooksEnv(self):
3565 This runs on the master, the primary and all the secondaries.
3569 "DISK_NAME": self.op.disk_name,
3570 "DISK_ID": self.op.disk_id,
3571 "OLD_SECONDARY": self.old_secondary,
3573 env.update(_BuildInstanceHookEnvByObject(self.instance))
3574 nl = [self.sstore.GetMasterNode(),
3575 self.instance.primary_node] + list(self.instance.secondary_nodes)
3578 def CheckPrereq(self):
3579 """Check prerequisites.
3581 This checks that the instance is in the cluster.
3584 instance = self.cfg.GetInstanceInfo(
3585 self.cfg.ExpandInstanceName(self.op.instance_name))
3586 if instance is None:
3587 raise errors.OpPrereqError("Instance '%s' not known" %
3588 self.op.instance_name)
3589 self.instance = instance
3591 if instance.disk_template != constants.DT_REMOTE_RAID1:
3592 raise errors.OpPrereqError("Instance's disk layout is not"
3594 for disk in instance.disks:
3595 if disk.iv_name == self.op.disk_name:
3598 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3599 " instance." % self.op.disk_name)
3600 for child in disk.children:
3601 if (child.dev_type == constants.LD_DRBD7 and
3602 child.logical_id[2] == self.op.disk_id):
3605 raise errors.OpPrereqError("Can't find the device with this port.")
3607 if len(disk.children) < 2:
3608 raise errors.OpPrereqError("Cannot remove the last component from"
3612 if self.child.logical_id[0] == instance.primary_node:
3616 self.old_secondary = self.child.logical_id[oid]
3618 def Exec(self, feedback_fn):
3619 """Remove the mirror component
3622 instance = self.instance
3625 logger.Info("remove mirror component")
3626 self.cfg.SetDiskID(disk, instance.primary_node)
3627 if not rpc.call_blockdev_removechildren(instance.primary_node,
3629 raise errors.OpExecError("Can't remove child from mirror.")
3631 for node in child.logical_id[:2]:
3632 self.cfg.SetDiskID(child, node)
3633 if not rpc.call_blockdev_remove(node, child):
3634 logger.Error("Warning: failed to remove device from node %s,"
3635 " continuing operation." % node)
3637 disk.children.remove(child)
3638 self.cfg.AddInstance(instance)
3641 class LUReplaceDisks(LogicalUnit):
3642 """Replace the disks of an instance.
3645 HPATH = "mirrors-replace"
3646 HTYPE = constants.HTYPE_INSTANCE
3647 _OP_REQP = ["instance_name", "mode", "disks"]
3649 def BuildHooksEnv(self):
3652 This runs on the master, the primary and all the secondaries.
3656 "MODE": self.op.mode,
3657 "NEW_SECONDARY": self.op.remote_node,
3658 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3660 env.update(_BuildInstanceHookEnvByObject(self.instance))
3662 self.sstore.GetMasterNode(),
3663 self.instance.primary_node,
3665 if self.op.remote_node is not None:
3666 nl.append(self.op.remote_node)
3669 def CheckPrereq(self):
3670 """Check prerequisites.
3672 This checks that the instance is in the cluster.
3675 instance = self.cfg.GetInstanceInfo(
3676 self.cfg.ExpandInstanceName(self.op.instance_name))
3677 if instance is None:
3678 raise errors.OpPrereqError("Instance '%s' not known" %
3679 self.op.instance_name)
3680 self.instance = instance
3681 self.op.instance_name = instance.name
3683 if instance.disk_template not in constants.DTS_NET_MIRROR:
3684 raise errors.OpPrereqError("Instance's disk layout is not"
3685 " network mirrored.")
3687 if len(instance.secondary_nodes) != 1:
3688 raise errors.OpPrereqError("The instance has a strange layout,"
3689 " expected one secondary but found %d" %
3690 len(instance.secondary_nodes))
3692 self.sec_node = instance.secondary_nodes[0]
3694 remote_node = getattr(self.op, "remote_node", None)
3695 if remote_node is not None:
3696 remote_node = self.cfg.ExpandNodeName(remote_node)
3697 if remote_node is None:
3698 raise errors.OpPrereqError("Node '%s' not known" %
3699 self.op.remote_node)
3700 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3702 self.remote_node_info = None
3703 if remote_node == instance.primary_node:
3704 raise errors.OpPrereqError("The specified node is the primary node of"
3706 elif remote_node == self.sec_node:
3707 if self.op.mode == constants.REPLACE_DISK_SEC:
3708 # this is for DRBD8, where we can't execute the same mode of
3709 # replacement as for drbd7 (no different port allocated)
3710 raise errors.OpPrereqError("Same secondary given, cannot execute"
3712 # the user gave the current secondary, switch to
3713 # 'no-replace-secondary' mode for drbd7
3715 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3716 self.op.mode != constants.REPLACE_DISK_ALL):
3717 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3718 " disks replacement, not individual ones")
3719 if instance.disk_template == constants.DT_DRBD8:
3720 if (self.op.mode == constants.REPLACE_DISK_ALL and
3721 remote_node is not None):
3722 # switch to replace secondary mode
3723 self.op.mode = constants.REPLACE_DISK_SEC
3725 if self.op.mode == constants.REPLACE_DISK_ALL:
3726 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3727 " secondary disk replacement, not"
3729 elif self.op.mode == constants.REPLACE_DISK_PRI:
3730 if remote_node is not None:
3731 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3732 " the secondary while doing a primary"
3733 " node disk replacement")
3734 self.tgt_node = instance.primary_node
3735 self.oth_node = instance.secondary_nodes[0]
3736 elif self.op.mode == constants.REPLACE_DISK_SEC:
3737 self.new_node = remote_node # this can be None, in which case
3738 # we don't change the secondary
3739 self.tgt_node = instance.secondary_nodes[0]
3740 self.oth_node = instance.primary_node
3742 raise errors.ProgrammerError("Unhandled disk replace mode")
3744 for name in self.op.disks:
3745 if instance.FindDisk(name) is None:
3746 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3747 (name, instance.name))
3748 self.op.remote_node = remote_node
3750 def _ExecRR1(self, feedback_fn):
3751 """Replace the disks of an instance.
3754 instance = self.instance
3757 if self.op.remote_node is None:
3758 remote_node = self.sec_node
3760 remote_node = self.op.remote_node
3762 for dev in instance.disks:
3764 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3765 names = _GenerateUniqueNames(cfg, lv_names)
3766 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3767 remote_node, size, names)
3768 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3769 logger.Info("adding new mirror component on secondary for %s" %
3772 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3774 _GetInstanceInfoText(instance)):
3775 raise errors.OpExecError("Failed to create new component on secondary"
3776 " node %s. Full abort, cleanup manually!" %
3779 logger.Info("adding new mirror component on primary")
3781 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3783 _GetInstanceInfoText(instance)):
3784 # remove secondary dev
3785 cfg.SetDiskID(new_drbd, remote_node)
3786 rpc.call_blockdev_remove(remote_node, new_drbd)
3787 raise errors.OpExecError("Failed to create volume on primary!"
3788 " Full abort, cleanup manually!!")
3790 # the device exists now
3791 # call the primary node to add the mirror to md
3792 logger.Info("adding new mirror component to md")
3793 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3795 logger.Error("Can't add mirror compoment to md!")
3796 cfg.SetDiskID(new_drbd, remote_node)
3797 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3798 logger.Error("Can't rollback on secondary")
3799 cfg.SetDiskID(new_drbd, instance.primary_node)
3800 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3801 logger.Error("Can't rollback on primary")
3802 raise errors.OpExecError("Full abort, cleanup manually!!")
3804 dev.children.append(new_drbd)
3805 cfg.AddInstance(instance)
3807 # this can fail as the old devices are degraded and _WaitForSync
3808 # does a combined result over all disks, so we don't check its
3810 _WaitForSync(cfg, instance, self.proc, unlock=True)
3812 # so check manually all the devices
3813 for name in iv_names:
3814 dev, child, new_drbd = iv_names[name]
3815 cfg.SetDiskID(dev, instance.primary_node)
3816 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3818 raise errors.OpExecError("MD device %s is degraded!" % name)
3819 cfg.SetDiskID(new_drbd, instance.primary_node)
3820 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3822 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3824 for name in iv_names:
3825 dev, child, new_drbd = iv_names[name]
3826 logger.Info("remove mirror %s component" % name)
3827 cfg.SetDiskID(dev, instance.primary_node)
3828 if not rpc.call_blockdev_removechildren(instance.primary_node,
3830 logger.Error("Can't remove child from mirror, aborting"
3831 " *this device cleanup*.\nYou need to cleanup manually!!")
3834 for node in child.logical_id[:2]:
3835 logger.Info("remove child device on %s" % node)
3836 cfg.SetDiskID(child, node)
3837 if not rpc.call_blockdev_remove(node, child):
3838 logger.Error("Warning: failed to remove device from node %s,"
3839 " continuing operation." % node)
3841 dev.children.remove(child)
3843 cfg.AddInstance(instance)
3845 def _ExecD8DiskOnly(self, feedback_fn):
3846 """Replace a disk on the primary or secondary for dbrd8.
3848 The algorithm for replace is quite complicated:
3849 - for each disk to be replaced:
3850 - create new LVs on the target node with unique names
3851 - detach old LVs from the drbd device
3852 - rename old LVs to name_replaced.<time_t>
3853 - rename new LVs to old LVs
3854 - attach the new LVs (with the old names now) to the drbd device
3855 - wait for sync across all devices
3856 - for each modified disk:
3857 - remove old LVs (which have the name name_replaces.<time_t>)
3859 Failures are not very well handled.
3863 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3864 instance = self.instance
3866 vgname = self.cfg.GetVGName()
3869 tgt_node = self.tgt_node
3870 oth_node = self.oth_node
3872 # Step: check device activation
3873 self.proc.LogStep(1, steps_total, "check device existence")
3874 info("checking volume groups")
3875 my_vg = cfg.GetVGName()
3876 results = rpc.call_vg_list([oth_node, tgt_node])
3878 raise errors.OpExecError("Can't list volume groups on the nodes")
3879 for node in oth_node, tgt_node:
3880 res = results.get(node, False)
3881 if not res or my_vg not in res:
3882 raise errors.OpExecError("Volume group '%s' not found on %s" %
3884 for dev in instance.disks:
3885 if not dev.iv_name in self.op.disks:
3887 for node in tgt_node, oth_node:
3888 info("checking %s on %s" % (dev.iv_name, node))
3889 cfg.SetDiskID(dev, node)
3890 if not rpc.call_blockdev_find(node, dev):
3891 raise errors.OpExecError("Can't find device %s on node %s" %
3892 (dev.iv_name, node))
3894 # Step: check other node consistency
3895 self.proc.LogStep(2, steps_total, "check peer consistency")
3896 for dev in instance.disks:
3897 if not dev.iv_name in self.op.disks:
3899 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3900 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3901 oth_node==instance.primary_node):
3902 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3903 " to replace disks on this node (%s)" %
3904 (oth_node, tgt_node))
3906 # Step: create new storage
3907 self.proc.LogStep(3, steps_total, "allocate new storage")
3908 for dev in instance.disks:
3909 if not dev.iv_name in self.op.disks:
3912 cfg.SetDiskID(dev, tgt_node)
3913 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3914 names = _GenerateUniqueNames(cfg, lv_names)
3915 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3916 logical_id=(vgname, names[0]))
3917 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3918 logical_id=(vgname, names[1]))
3919 new_lvs = [lv_data, lv_meta]
3920 old_lvs = dev.children
3921 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3922 info("creating new local storage on %s for %s" %
3923 (tgt_node, dev.iv_name))
3924 # since we *always* want to create this LV, we use the
3925 # _Create...OnPrimary (which forces the creation), even if we
3926 # are talking about the secondary node
3927 for new_lv in new_lvs:
3928 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3929 _GetInstanceInfoText(instance)):
3930 raise errors.OpExecError("Failed to create new LV named '%s' on"
3932 (new_lv.logical_id[1], tgt_node))
3934 # Step: for each lv, detach+rename*2+attach
3935 self.proc.LogStep(4, steps_total, "change drbd configuration")
3936 for dev, old_lvs, new_lvs in iv_names.itervalues():
3937 info("detaching %s drbd from local storage" % dev.iv_name)
3938 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3939 raise errors.OpExecError("Can't detach drbd from local storage on node"
3940 " %s for device %s" % (tgt_node, dev.iv_name))
3942 #cfg.Update(instance)
3944 # ok, we created the new LVs, so now we know we have the needed
3945 # storage; as such, we proceed on the target node to rename
3946 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3947 # using the assumption that logical_id == physical_id (which in
3948 # turn is the unique_id on that node)
3950 # FIXME(iustin): use a better name for the replaced LVs
3951 temp_suffix = int(time.time())
3952 ren_fn = lambda d, suff: (d.physical_id[0],
3953 d.physical_id[1] + "_replaced-%s" % suff)
3954 # build the rename list based on what LVs exist on the node
3956 for to_ren in old_lvs:
3957 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3958 if find_res is not None: # device exists
3959 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3961 info("renaming the old LVs on the target node")
3962 if not rpc.call_blockdev_rename(tgt_node, rlist):
3963 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3964 # now we rename the new LVs to the old LVs
3965 info("renaming the new LVs on the target node")
3966 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3967 if not rpc.call_blockdev_rename(tgt_node, rlist):
3968 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3970 for old, new in zip(old_lvs, new_lvs):
3971 new.logical_id = old.logical_id
3972 cfg.SetDiskID(new, tgt_node)
3974 for disk in old_lvs:
3975 disk.logical_id = ren_fn(disk, temp_suffix)
3976 cfg.SetDiskID(disk, tgt_node)
3978 # now that the new lvs have the old name, we can add them to the device
3979 info("adding new mirror component on %s" % tgt_node)
3980 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3981 for new_lv in new_lvs:
3982 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3983 warning("Can't rollback device %s", hint="manually cleanup unused"
3985 raise errors.OpExecError("Can't add local storage to drbd")
3987 dev.children = new_lvs
3988 cfg.Update(instance)
3990 # Step: wait for sync
3992 # this can fail as the old devices are degraded and _WaitForSync
3993 # does a combined result over all disks, so we don't check its
3995 self.proc.LogStep(5, steps_total, "sync devices")
3996 _WaitForSync(cfg, instance, self.proc, unlock=True)
3998 # so check manually all the devices
3999 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4000 cfg.SetDiskID(dev, instance.primary_node)
4001 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4003 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4005 # Step: remove old storage
4006 self.proc.LogStep(6, steps_total, "removing old storage")
4007 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4008 info("remove logical volumes for %s" % name)
4010 cfg.SetDiskID(lv, tgt_node)
4011 if not rpc.call_blockdev_remove(tgt_node, lv):
4012 warning("Can't remove old LV", hint="manually remove unused LVs")
4015 def _ExecD8Secondary(self, feedback_fn):
4016 """Replace the secondary node for drbd8.
4018 The algorithm for replace is quite complicated:
4019 - for all disks of the instance:
4020 - create new LVs on the new node with same names
4021 - shutdown the drbd device on the old secondary
4022 - disconnect the drbd network on the primary
4023 - create the drbd device on the new secondary
4024 - network attach the drbd on the primary, using an artifice:
4025 the drbd code for Attach() will connect to the network if it
4026 finds a device which is connected to the good local disks but
4028 - wait for sync across all devices
4029 - remove all disks from the old secondary
4031 Failures are not very well handled.
4035 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4036 instance = self.instance
4038 vgname = self.cfg.GetVGName()
4041 old_node = self.tgt_node
4042 new_node = self.new_node
4043 pri_node = instance.primary_node
4045 # Step: check device activation
4046 self.proc.LogStep(1, steps_total, "check device existence")
4047 info("checking volume groups")
4048 my_vg = cfg.GetVGName()
4049 results = rpc.call_vg_list([pri_node, new_node])
4051 raise errors.OpExecError("Can't list volume groups on the nodes")
4052 for node in pri_node, new_node:
4053 res = results.get(node, False)
4054 if not res or my_vg not in res:
4055 raise errors.OpExecError("Volume group '%s' not found on %s" %
4057 for dev in instance.disks:
4058 if not dev.iv_name in self.op.disks:
4060 info("checking %s on %s" % (dev.iv_name, pri_node))
4061 cfg.SetDiskID(dev, pri_node)
4062 if not rpc.call_blockdev_find(pri_node, dev):
4063 raise errors.OpExecError("Can't find device %s on node %s" %
4064 (dev.iv_name, pri_node))
4066 # Step: check other node consistency
4067 self.proc.LogStep(2, steps_total, "check peer consistency")
4068 for dev in instance.disks:
4069 if not dev.iv_name in self.op.disks:
4071 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4072 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4073 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4074 " unsafe to replace the secondary" %
4077 # Step: create new storage
4078 self.proc.LogStep(3, steps_total, "allocate new storage")
4079 for dev in instance.disks:
4081 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4082 # since we *always* want to create this LV, we use the
4083 # _Create...OnPrimary (which forces the creation), even if we
4084 # are talking about the secondary node
4085 for new_lv in dev.children:
4086 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4087 _GetInstanceInfoText(instance)):
4088 raise errors.OpExecError("Failed to create new LV named '%s' on"
4090 (new_lv.logical_id[1], new_node))
4092 iv_names[dev.iv_name] = (dev, dev.children)
4094 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4095 for dev in instance.disks:
4097 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4098 # create new devices on new_node
4099 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4100 logical_id=(pri_node, new_node,
4102 children=dev.children)
4103 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4105 _GetInstanceInfoText(instance)):
4106 raise errors.OpExecError("Failed to create new DRBD on"
4107 " node '%s'" % new_node)
4109 for dev in instance.disks:
4110 # we have new devices, shutdown the drbd on the old secondary
4111 info("shutting down drbd for %s on old node" % dev.iv_name)
4112 cfg.SetDiskID(dev, old_node)
4113 if not rpc.call_blockdev_shutdown(old_node, dev):
4114 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4115 hint="Please cleanup this device manually as soon as possible")
4117 info("detaching primary drbds from the network (=> standalone)")
4119 for dev in instance.disks:
4120 cfg.SetDiskID(dev, pri_node)
4121 # set the physical (unique in bdev terms) id to None, meaning
4122 # detach from network
4123 dev.physical_id = (None,) * len(dev.physical_id)
4124 # and 'find' the device, which will 'fix' it to match the
4126 if rpc.call_blockdev_find(pri_node, dev):
4129 warning("Failed to detach drbd %s from network, unusual case" %
4133 # no detaches succeeded (very unlikely)
4134 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4136 # if we managed to detach at least one, we update all the disks of
4137 # the instance to point to the new secondary
4138 info("updating instance configuration")
4139 for dev in instance.disks:
4140 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4141 cfg.SetDiskID(dev, pri_node)
4142 cfg.Update(instance)
4144 # and now perform the drbd attach
4145 info("attaching primary drbds to new secondary (standalone => connected)")
4147 for dev in instance.disks:
4148 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4149 # since the attach is smart, it's enough to 'find' the device,
4150 # it will automatically activate the network, if the physical_id
4152 cfg.SetDiskID(dev, pri_node)
4153 if not rpc.call_blockdev_find(pri_node, dev):
4154 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4155 "please do a gnt-instance info to see the status of disks")
4157 # this can fail as the old devices are degraded and _WaitForSync
4158 # does a combined result over all disks, so we don't check its
4160 self.proc.LogStep(5, steps_total, "sync devices")
4161 _WaitForSync(cfg, instance, self.proc, unlock=True)
4163 # so check manually all the devices
4164 for name, (dev, old_lvs) in iv_names.iteritems():
4165 cfg.SetDiskID(dev, pri_node)
4166 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4168 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4170 self.proc.LogStep(6, steps_total, "removing old storage")
4171 for name, (dev, old_lvs) in iv_names.iteritems():
4172 info("remove logical volumes for %s" % name)
4174 cfg.SetDiskID(lv, old_node)
4175 if not rpc.call_blockdev_remove(old_node, lv):
4176 warning("Can't remove LV on old secondary",
4177 hint="Cleanup stale volumes by hand")
4179 def Exec(self, feedback_fn):
4180 """Execute disk replacement.
4182 This dispatches the disk replacement to the appropriate handler.
4185 instance = self.instance
4186 if instance.disk_template == constants.DT_REMOTE_RAID1:
4188 elif instance.disk_template == constants.DT_DRBD8:
4189 if self.op.remote_node is None:
4190 fn = self._ExecD8DiskOnly
4192 fn = self._ExecD8Secondary
4194 raise errors.ProgrammerError("Unhandled disk replacement case")
4195 return fn(feedback_fn)
4198 class LUQueryInstanceData(NoHooksLU):
4199 """Query runtime instance data.
4202 _OP_REQP = ["instances"]
4204 def CheckPrereq(self):
4205 """Check prerequisites.
4207 This only checks the optional instance list against the existing names.
4210 if not isinstance(self.op.instances, list):
4211 raise errors.OpPrereqError("Invalid argument type 'instances'")
4212 if self.op.instances:
4213 self.wanted_instances = []
4214 names = self.op.instances
4216 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4217 if instance is None:
4218 raise errors.OpPrereqError("No such instance name '%s'" % name)
4219 self.wanted_instances.append(instance)
4221 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4222 in self.cfg.GetInstanceList()]
4226 def _ComputeDiskStatus(self, instance, snode, dev):
4227 """Compute block device status.
4230 self.cfg.SetDiskID(dev, instance.primary_node)
4231 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4232 if dev.dev_type in constants.LDS_DRBD:
4233 # we change the snode then (otherwise we use the one passed in)
4234 if dev.logical_id[0] == instance.primary_node:
4235 snode = dev.logical_id[1]
4237 snode = dev.logical_id[0]
4240 self.cfg.SetDiskID(dev, snode)
4241 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4246 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4247 for child in dev.children]
4252 "iv_name": dev.iv_name,
4253 "dev_type": dev.dev_type,
4254 "logical_id": dev.logical_id,
4255 "physical_id": dev.physical_id,
4256 "pstatus": dev_pstatus,
4257 "sstatus": dev_sstatus,
4258 "children": dev_children,
4263 def Exec(self, feedback_fn):
4264 """Gather and return data"""
4266 for instance in self.wanted_instances:
4267 remote_info = rpc.call_instance_info(instance.primary_node,
4269 if remote_info and "state" in remote_info:
4272 remote_state = "down"
4273 if instance.status == "down":
4274 config_state = "down"
4278 disks = [self._ComputeDiskStatus(instance, None, device)
4279 for device in instance.disks]
4282 "name": instance.name,
4283 "config_state": config_state,
4284 "run_state": remote_state,
4285 "pnode": instance.primary_node,
4286 "snodes": instance.secondary_nodes,
4288 "memory": instance.memory,
4289 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4291 "network_port": instance.network_port,
4292 "vcpus": instance.vcpus,
4293 "kernel_path": instance.kernel_path,
4294 "initrd_path": instance.initrd_path,
4295 "hvm_boot_order": instance.hvm_boot_order,
4298 result[instance.name] = idict
4303 class LUSetInstanceParms(LogicalUnit):
4304 """Modifies an instances's parameters.
4307 HPATH = "instance-modify"
4308 HTYPE = constants.HTYPE_INSTANCE
4309 _OP_REQP = ["instance_name"]
4311 def BuildHooksEnv(self):
4314 This runs on the master, primary and secondaries.
4319 args['memory'] = self.mem
4321 args['vcpus'] = self.vcpus
4322 if self.do_ip or self.do_bridge or self.mac:
4326 ip = self.instance.nics[0].ip
4328 bridge = self.bridge
4330 bridge = self.instance.nics[0].bridge
4334 mac = self.instance.nics[0].mac
4335 args['nics'] = [(ip, bridge, mac)]
4336 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4337 nl = [self.sstore.GetMasterNode(),
4338 self.instance.primary_node] + list(self.instance.secondary_nodes)
4341 def CheckPrereq(self):
4342 """Check prerequisites.
4344 This only checks the instance list against the existing names.
4347 self.mem = getattr(self.op, "mem", None)
4348 self.vcpus = getattr(self.op, "vcpus", None)
4349 self.ip = getattr(self.op, "ip", None)
4350 self.mac = getattr(self.op, "mac", None)
4351 self.bridge = getattr(self.op, "bridge", None)
4352 self.kernel_path = getattr(self.op, "kernel_path", None)
4353 self.initrd_path = getattr(self.op, "initrd_path", None)
4354 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4355 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4356 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4357 if all_parms.count(None) == len(all_parms):
4358 raise errors.OpPrereqError("No changes submitted")
4359 if self.mem is not None:
4361 self.mem = int(self.mem)
4362 except ValueError, err:
4363 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4364 if self.vcpus is not None:
4366 self.vcpus = int(self.vcpus)
4367 except ValueError, err:
4368 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4369 if self.ip is not None:
4371 if self.ip.lower() == "none":
4374 if not utils.IsValidIP(self.ip):
4375 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4378 self.do_bridge = (self.bridge is not None)
4379 if self.mac is not None:
4380 if self.cfg.IsMacInUse(self.mac):
4381 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4383 if not utils.IsValidMac(self.mac):
4384 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4386 if self.kernel_path is not None:
4387 self.do_kernel_path = True
4388 if self.kernel_path == constants.VALUE_NONE:
4389 raise errors.OpPrereqError("Can't set instance to no kernel")
4391 if self.kernel_path != constants.VALUE_DEFAULT:
4392 if not os.path.isabs(self.kernel_path):
4393 raise errors.OpPrereqError("The kernel path must be an absolute"
4396 self.do_kernel_path = False
4398 if self.initrd_path is not None:
4399 self.do_initrd_path = True
4400 if self.initrd_path not in (constants.VALUE_NONE,
4401 constants.VALUE_DEFAULT):
4402 if not os.path.isabs(self.initrd_path):
4403 raise errors.OpPrereqError("The initrd path must be an absolute"
4406 self.do_initrd_path = False
4408 # boot order verification
4409 if self.hvm_boot_order is not None:
4410 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4411 if len(self.hvm_boot_order.strip("acdn")) != 0:
4412 raise errors.OpPrereqError("invalid boot order specified,"
4413 " must be one or more of [acdn]"
4416 instance = self.cfg.GetInstanceInfo(
4417 self.cfg.ExpandInstanceName(self.op.instance_name))
4418 if instance is None:
4419 raise errors.OpPrereqError("No such instance name '%s'" %
4420 self.op.instance_name)
4421 self.op.instance_name = instance.name
4422 self.instance = instance
4425 def Exec(self, feedback_fn):
4426 """Modifies an instance.
4428 All parameters take effect only at the next restart of the instance.
4431 instance = self.instance
4433 instance.memory = self.mem
4434 result.append(("mem", self.mem))
4436 instance.vcpus = self.vcpus
4437 result.append(("vcpus", self.vcpus))
4439 instance.nics[0].ip = self.ip
4440 result.append(("ip", self.ip))
4442 instance.nics[0].bridge = self.bridge
4443 result.append(("bridge", self.bridge))
4445 instance.nics[0].mac = self.mac
4446 result.append(("mac", self.mac))
4447 if self.do_kernel_path:
4448 instance.kernel_path = self.kernel_path
4449 result.append(("kernel_path", self.kernel_path))
4450 if self.do_initrd_path:
4451 instance.initrd_path = self.initrd_path
4452 result.append(("initrd_path", self.initrd_path))
4453 if self.hvm_boot_order:
4454 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4455 instance.hvm_boot_order = None
4457 instance.hvm_boot_order = self.hvm_boot_order
4458 result.append(("hvm_boot_order", self.hvm_boot_order))
4460 self.cfg.AddInstance(instance)
4465 class LUQueryExports(NoHooksLU):
4466 """Query the exports list
4471 def CheckPrereq(self):
4472 """Check that the nodelist contains only existing nodes.
4475 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4477 def Exec(self, feedback_fn):
4478 """Compute the list of all the exported system images.
4481 a dictionary with the structure node->(export-list)
4482 where export-list is a list of the instances exported on
4486 return rpc.call_export_list(self.nodes)
4489 class LUExportInstance(LogicalUnit):
4490 """Export an instance to an image in the cluster.
4493 HPATH = "instance-export"
4494 HTYPE = constants.HTYPE_INSTANCE
4495 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4497 def BuildHooksEnv(self):
4500 This will run on the master, primary node and target node.
4504 "EXPORT_NODE": self.op.target_node,
4505 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4507 env.update(_BuildInstanceHookEnvByObject(self.instance))
4508 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4509 self.op.target_node]
4512 def CheckPrereq(self):
4513 """Check prerequisites.
4515 This checks that the instance name is a valid one.
4518 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4519 self.instance = self.cfg.GetInstanceInfo(instance_name)
4520 if self.instance is None:
4521 raise errors.OpPrereqError("Instance '%s' not found" %
4522 self.op.instance_name)
4525 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4526 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4528 if self.dst_node is None:
4529 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4530 self.op.target_node)
4531 self.op.target_node = self.dst_node.name
4533 def Exec(self, feedback_fn):
4534 """Export an instance to an image in the cluster.
4537 instance = self.instance
4538 dst_node = self.dst_node
4539 src_node = instance.primary_node
4540 if self.op.shutdown:
4541 # shutdown the instance, but not the disks
4542 if not rpc.call_instance_shutdown(src_node, instance):
4543 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4544 (instance.name, source_node))
4546 vgname = self.cfg.GetVGName()
4551 for disk in instance.disks:
4552 if disk.iv_name == "sda":
4553 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4554 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4556 if not new_dev_name:
4557 logger.Error("could not snapshot block device %s on node %s" %
4558 (disk.logical_id[1], src_node))
4560 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4561 logical_id=(vgname, new_dev_name),
4562 physical_id=(vgname, new_dev_name),
4563 iv_name=disk.iv_name)
4564 snap_disks.append(new_dev)
4567 if self.op.shutdown and instance.status == "up":
4568 if not rpc.call_instance_start(src_node, instance, None):
4569 _ShutdownInstanceDisks(instance, self.cfg)
4570 raise errors.OpExecError("Could not start instance")
4572 # TODO: check for size
4574 for dev in snap_disks:
4575 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4577 logger.Error("could not export block device %s from node"
4579 (dev.logical_id[1], src_node, dst_node.name))
4580 if not rpc.call_blockdev_remove(src_node, dev):
4581 logger.Error("could not remove snapshot block device %s from"
4582 " node %s" % (dev.logical_id[1], src_node))
4584 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4585 logger.Error("could not finalize export for instance %s on node %s" %
4586 (instance.name, dst_node.name))
4588 nodelist = self.cfg.GetNodeList()
4589 nodelist.remove(dst_node.name)
4591 # on one-node clusters nodelist will be empty after the removal
4592 # if we proceed the backup would be removed because OpQueryExports
4593 # substitutes an empty list with the full cluster node list.
4595 op = opcodes.OpQueryExports(nodes=nodelist)
4596 exportlist = self.proc.ChainOpCode(op)
4597 for node in exportlist:
4598 if instance.name in exportlist[node]:
4599 if not rpc.call_export_remove(node, instance.name):
4600 logger.Error("could not remove older export for instance %s"
4601 " on node %s" % (instance.name, node))
4604 class TagsLU(NoHooksLU):
4607 This is an abstract class which is the parent of all the other tags LUs.
4610 def CheckPrereq(self):
4611 """Check prerequisites.
4614 if self.op.kind == constants.TAG_CLUSTER:
4615 self.target = self.cfg.GetClusterInfo()
4616 elif self.op.kind == constants.TAG_NODE:
4617 name = self.cfg.ExpandNodeName(self.op.name)
4619 raise errors.OpPrereqError("Invalid node name (%s)" %
4622 self.target = self.cfg.GetNodeInfo(name)
4623 elif self.op.kind == constants.TAG_INSTANCE:
4624 name = self.cfg.ExpandInstanceName(self.op.name)
4626 raise errors.OpPrereqError("Invalid instance name (%s)" %
4629 self.target = self.cfg.GetInstanceInfo(name)
4631 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4635 class LUGetTags(TagsLU):
4636 """Returns the tags of a given object.
4639 _OP_REQP = ["kind", "name"]
4641 def Exec(self, feedback_fn):
4642 """Returns the tag list.
4645 return self.target.GetTags()
4648 class LUSearchTags(NoHooksLU):
4649 """Searches the tags for a given pattern.
4652 _OP_REQP = ["pattern"]
4654 def CheckPrereq(self):
4655 """Check prerequisites.
4657 This checks the pattern passed for validity by compiling it.
4661 self.re = re.compile(self.op.pattern)
4662 except re.error, err:
4663 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4664 (self.op.pattern, err))
4666 def Exec(self, feedback_fn):
4667 """Returns the tag list.
4671 tgts = [("/cluster", cfg.GetClusterInfo())]
4672 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4673 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4674 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4675 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4677 for path, target in tgts:
4678 for tag in target.GetTags():
4679 if self.re.search(tag):
4680 results.append((path, tag))
4684 class LUAddTags(TagsLU):
4685 """Sets a tag on a given object.
4688 _OP_REQP = ["kind", "name", "tags"]
4690 def CheckPrereq(self):
4691 """Check prerequisites.
4693 This checks the type and length of the tag name and value.
4696 TagsLU.CheckPrereq(self)
4697 for tag in self.op.tags:
4698 objects.TaggableObject.ValidateTag(tag)
4700 def Exec(self, feedback_fn):
4705 for tag in self.op.tags:
4706 self.target.AddTag(tag)
4707 except errors.TagError, err:
4708 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4710 self.cfg.Update(self.target)
4711 except errors.ConfigurationError:
4712 raise errors.OpRetryError("There has been a modification to the"
4713 " config file and the operation has been"
4714 " aborted. Please retry.")
4717 class LUDelTags(TagsLU):
4718 """Delete a list of tags from a given object.
4721 _OP_REQP = ["kind", "name", "tags"]
4723 def CheckPrereq(self):
4724 """Check prerequisites.
4726 This checks that we have the given tag.
4729 TagsLU.CheckPrereq(self)
4730 for tag in self.op.tags:
4731 objects.TaggableObject.ValidateTag(tag)
4732 del_tags = frozenset(self.op.tags)
4733 cur_tags = self.target.GetTags()
4734 if not del_tags <= cur_tags:
4735 diff_tags = del_tags - cur_tags
4736 diff_names = ["'%s'" % tag for tag in diff_tags]
4738 raise errors.OpPrereqError("Tag(s) %s not found" %
4739 (",".join(diff_names)))
4741 def Exec(self, feedback_fn):
4742 """Remove the tag from the object.
4745 for tag in self.op.tags:
4746 self.target.RemoveTag(tag)
4748 self.cfg.Update(self.target)
4749 except errors.ConfigurationError:
4750 raise errors.OpRetryError("There has been a modification to the"
4751 " config file and the operation has been"
4752 " aborted. Please retry.")
4754 class LUTestDelay(NoHooksLU):
4755 """Sleep for a specified amount of time.
4757 This LU sleeps on the master and/or nodes for a specified amoutn of
4761 _OP_REQP = ["duration", "on_master", "on_nodes"]
4763 def CheckPrereq(self):
4764 """Check prerequisites.
4766 This checks that we have a good list of nodes and/or the duration
4771 if self.op.on_nodes:
4772 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4774 def Exec(self, feedback_fn):
4775 """Do the actual sleep.
4778 if self.op.on_master:
4779 if not utils.TestDelay(self.op.duration):
4780 raise errors.OpExecError("Error during master delay test")
4781 if self.op.on_nodes:
4782 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4784 raise errors.OpExecError("Complete failure from rpc call")
4785 for node, node_result in result.items():
4787 raise errors.OpExecError("Failure during rpc call to node %s,"
4788 " result: %s" % (node, node_result))
4791 def _IAllocatorGetClusterData(cfg, sstore):
4792 """Compute the generic allocator input data.
4794 This is the data that is independent of the actual operation.
4800 "cluster_name": sstore.GetClusterName(),
4801 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4802 # we don't have job IDs
4807 node_list = cfg.GetNodeList()
4808 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4809 for nname in node_list:
4810 ninfo = cfg.GetNodeInfo(nname)
4811 if nname not in node_data or not isinstance(node_data[nname], dict):
4812 raise errors.OpExecError("Can't get data for node %s" % nname)
4813 remote_info = node_data[nname]
4814 for attr in ['memory_total', 'memory_free',
4815 'vg_size', 'vg_free']:
4816 if attr not in remote_info:
4817 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4820 int(remote_info[attr])
4821 except ValueError, err:
4822 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4823 " %s" % (nname, attr, str(err)))
4825 "tags": list(ninfo.GetTags()),
4826 "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4827 "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4828 "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4829 "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4830 "primary_ip": ninfo.primary_ip,
4831 "secondary_ip": ninfo.secondary_ip,
4833 node_results[nname] = pnr
4834 data["nodes"] = node_results
4838 i_list = cfg.GetInstanceList()
4839 for iname in i_list:
4840 iinfo = cfg.GetInstanceInfo(iname)
4841 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4842 for n in iinfo.nics]
4844 "tags": list(iinfo.GetTags()),
4845 "should_run": iinfo.status == "up",
4846 "vcpus": iinfo.vcpus,
4847 "memory": iinfo.memory,
4849 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4851 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4852 "disk_template": iinfo.disk_template,
4854 instance_data[iname] = pir
4856 data["instances"] = instance_data
4861 def _IAllocatorAddNewInstance(data, op):
4862 """Add new instance data to allocator structure.
4864 This in combination with _AllocatorGetClusterData will create the
4865 correct structure needed as input for the allocator.
4867 The checks for the completeness of the opcode must have already been
4871 if len(op.disks) != 2:
4872 raise errors.OpExecError("Only two-disk configurations supported")
4874 disk_space = _ComputeDiskSize(op.disk_template,
4875 op.disks[0]["size"], op.disks[1]["size"])
4880 "disk_template": op.disk_template,
4884 "memory": op.mem_size,
4886 "disk_space_total": disk_space,
4889 data["request"] = request
4892 def _IAllocatorAddRelocateInstance(data, op):
4893 """Add relocate instance data to allocator structure.
4895 This in combination with _IAllocatorGetClusterData will create the
4896 correct structure needed as input for the allocator.
4898 The checks for the completeness of the opcode must have already been
4903 "type": "replace_secondary",
4906 data["request"] = request
4909 def _IAllocatorRun(name, data):
4910 """Run an instance allocator and return the results.
4913 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4915 if alloc_script is None:
4916 raise errors.OpExecError("Can't find allocator '%s'" % name)
4918 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4922 result = utils.RunCmd([alloc_script, fin_name])
4924 raise errors.OpExecError("Instance allocator call failed: %s,"
4926 (result.fail_reason, result.stdout))
4929 return result.stdout
4932 def _IAllocatorValidateResult(data):
4933 """Process the allocator results.
4937 rdict = serializer.Load(data)
4938 except Exception, err:
4939 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4941 if not isinstance(rdict, dict):
4942 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4944 for key in "success", "info", "nodes":
4945 if key not in rdict:
4946 raise errors.OpExecError("Can't parse iallocator results:"
4947 " missing key '%s'" % key)
4949 if not isinstance(rdict["nodes"], list):
4950 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4955 class LUTestAllocator(NoHooksLU):
4956 """Run allocator tests.
4958 This LU runs the allocator tests
4961 _OP_REQP = ["direction", "mode", "name"]
4963 def CheckPrereq(self):
4964 """Check prerequisites.
4966 This checks the opcode parameters depending on the director and mode test.
4969 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4970 for attr in ["name", "mem_size", "disks", "disk_template",
4971 "os", "tags", "nics", "vcpus"]:
4972 if not hasattr(self.op, attr):
4973 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4975 iname = self.cfg.ExpandInstanceName(self.op.name)
4976 if iname is not None:
4977 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4979 if not isinstance(self.op.nics, list):
4980 raise errors.OpPrereqError("Invalid parameter 'nics'")
4981 for row in self.op.nics:
4982 if (not isinstance(row, dict) or
4985 "bridge" not in row):
4986 raise errors.OpPrereqError("Invalid contents of the"
4987 " 'nics' parameter")
4988 if not isinstance(self.op.disks, list):
4989 raise errors.OpPrereqError("Invalid parameter 'disks'")
4990 if len(self.op.disks) != 2:
4991 raise errors.OpPrereqError("Only two-disk configurations supported")
4992 for row in self.op.disks:
4993 if (not isinstance(row, dict) or
4994 "size" not in row or
4995 not isinstance(row["size"], int) or
4996 "mode" not in row or
4997 row["mode"] not in ['r', 'w']):
4998 raise errors.OpPrereqError("Invalid contents of the"
4999 " 'disks' parameter")
5000 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5001 if not hasattr(self.op, "name"):
5002 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5003 fname = self.cfg.ExpandInstanceName(self.op.name)
5005 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5007 self.op.name = fname
5009 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5012 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5013 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5014 raise errors.OpPrereqError("Missing allocator name")
5015 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5016 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5019 def Exec(self, feedback_fn):
5020 """Run the allocator test.
5023 data = _IAllocatorGetClusterData(self.cfg, self.sstore)
5024 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5025 _IAllocatorAddNewInstance(data, self.op)
5027 _IAllocatorAddRelocateInstance(data, self.op)
5029 text = serializer.Dump(data)
5030 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5033 result = _IAllocatorRun(self.op.allocator, text)