4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 As for the node lists, the master should not be included in the
137 them, as it will be added by the hooks runner in case this LU
138 requires a cluster to run on (otherwise we don't have a node
139 list). No nodes should be returned as an empty list (and not
142 Note that if the HPATH for a LU class is None, this function will
146 raise NotImplementedError
149 class NoHooksLU(LogicalUnit):
150 """Simple LU which runs no hooks.
152 This LU is intended as a parent for other LogicalUnits which will
153 run no hooks, in order to reduce duplicate code.
159 def BuildHooksEnv(self):
162 This is a no-op, since we don't run hooks.
168 def _AddHostToEtcHosts(hostname):
169 """Wrapper around utils.SetEtcHostsEntry.
172 hi = utils.HostInfo(name=hostname)
173 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
176 def _RemoveHostFromEtcHosts(hostname):
177 """Wrapper around utils.RemoveEtcHostsEntry.
180 hi = utils.HostInfo(name=hostname)
181 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
182 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
185 def _GetWantedNodes(lu, nodes):
186 """Returns list of checked and expanded node names.
189 nodes: List of nodes (strings) or None for all
192 if not isinstance(nodes, list):
193 raise errors.OpPrereqError("Invalid argument type 'nodes'")
199 node = lu.cfg.ExpandNodeName(name)
201 raise errors.OpPrereqError("No such node name '%s'" % name)
205 wanted = lu.cfg.GetNodeList()
206 return utils.NiceSort(wanted)
209 def _GetWantedInstances(lu, instances):
210 """Returns list of checked and expanded instance names.
213 instances: List of instances (strings) or None for all
216 if not isinstance(instances, list):
217 raise errors.OpPrereqError("Invalid argument type 'instances'")
222 for name in instances:
223 instance = lu.cfg.ExpandInstanceName(name)
225 raise errors.OpPrereqError("No such instance name '%s'" % name)
226 wanted.append(instance)
229 wanted = lu.cfg.GetInstanceList()
230 return utils.NiceSort(wanted)
233 def _CheckOutputFields(static, dynamic, selected):
234 """Checks whether all selected fields are valid.
237 static: Static fields
238 dynamic: Dynamic fields
241 static_fields = frozenset(static)
242 dynamic_fields = frozenset(dynamic)
244 all_fields = static_fields | dynamic_fields
246 if not all_fields.issuperset(selected):
247 raise errors.OpPrereqError("Unknown output fields selected: %s"
248 % ",".join(frozenset(selected).
249 difference(all_fields)))
252 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253 memory, vcpus, nics):
254 """Builds instance related env variables for hooks from single variables.
257 secondary_nodes: List of secondary nodes as strings
261 "INSTANCE_NAME": name,
262 "INSTANCE_PRIMARY": primary_node,
263 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264 "INSTANCE_OS_TYPE": os_type,
265 "INSTANCE_STATUS": status,
266 "INSTANCE_MEMORY": memory,
267 "INSTANCE_VCPUS": vcpus,
271 nic_count = len(nics)
272 for idx, (ip, bridge, mac) in enumerate(nics):
275 env["INSTANCE_NIC%d_IP" % idx] = ip
276 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
281 env["INSTANCE_NIC_COUNT"] = nic_count
286 def _BuildInstanceHookEnvByObject(instance, override=None):
287 """Builds instance related env variables for hooks from an object.
290 instance: objects.Instance object of instance
291 override: dict of values to override
294 'name': instance.name,
295 'primary_node': instance.primary_node,
296 'secondary_nodes': instance.secondary_nodes,
297 'os_type': instance.os,
298 'status': instance.os,
299 'memory': instance.memory,
300 'vcpus': instance.vcpus,
301 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
304 args.update(override)
305 return _BuildInstanceHookEnv(**args)
308 def _UpdateKnownHosts(fullnode, ip, pubkey):
309 """Ensure a node has a correct known_hosts entry.
312 fullnode - Fully qualified domain name of host. (str)
313 ip - IPv4 address of host (str)
314 pubkey - the public key of the cluster
317 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
318 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
320 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
329 logger.Debug('read %s' % (repr(rawline),))
331 parts = rawline.rstrip('\r\n').split()
333 # Ignore unwanted lines
334 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
335 fields = parts[0].split(',')
340 for spec in [ ip, fullnode ]:
341 if spec not in fields:
346 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
347 if haveall and key == pubkey:
349 save_lines.append(rawline)
350 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
353 if havesome and (not haveall or key != pubkey):
355 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
358 save_lines.append(rawline)
361 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
362 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
365 save_lines = save_lines + add_lines
367 # Write a new file and replace old.
368 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
370 newfile = os.fdopen(fd, 'w')
372 newfile.write(''.join(save_lines))
375 logger.Debug("Wrote new known_hosts.")
376 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
379 # Simply appending a new line will do the trick.
381 for add in add_lines:
387 def _HasValidVG(vglist, vgname):
388 """Checks if the volume group list is valid.
390 A non-None return value means there's an error, and the return value
391 is the error message.
394 vgsize = vglist.get(vgname, None)
396 return "volume group '%s' missing" % vgname
398 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
403 def _InitSSHSetup(node):
404 """Setup the SSH configuration for the cluster.
407 This generates a dsa keypair for root, adds the pub key to the
408 permitted hosts and adds the hostkey to its own known hosts.
411 node: the name of this host as a fqdn
414 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
416 for name in priv_key, pub_key:
417 if os.path.exists(name):
418 utils.CreateBackup(name)
419 utils.RemoveFile(name)
421 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
425 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
428 f = open(pub_key, 'r')
430 utils.AddAuthorizedKey(auth_keys, f.read(8192))
435 def _InitGanetiServerSetup(ss):
436 """Setup the necessary configuration for the initial node daemon.
438 This creates the nodepass file containing the shared password for
439 the cluster and also generates the SSL certificate.
442 # Create pseudo random password
443 randpass = sha.new(os.urandom(64)).hexdigest()
444 # and write it into sstore
445 ss.SetKey(ss.SS_NODED_PASS, randpass)
447 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
448 "-days", str(365*5), "-nodes", "-x509",
449 "-keyout", constants.SSL_CERT_FILE,
450 "-out", constants.SSL_CERT_FILE, "-batch"])
452 raise errors.OpExecError("could not generate server ssl cert, command"
453 " %s had exitcode %s and error message %s" %
454 (result.cmd, result.exit_code, result.output))
456 os.chmod(constants.SSL_CERT_FILE, 0400)
458 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
461 raise errors.OpExecError("Could not start the node daemon, command %s"
462 " had exitcode %s and error %s" %
463 (result.cmd, result.exit_code, result.output))
466 def _CheckInstanceBridgesExist(instance):
467 """Check that the brigdes needed by an instance exist.
470 # check bridges existance
471 brlist = [nic.bridge for nic in instance.nics]
472 if not rpc.call_bridges_exist(instance.primary_node, brlist):
473 raise errors.OpPrereqError("one or more target bridges %s does not"
474 " exist on destination node '%s'" %
475 (brlist, instance.primary_node))
478 class LUInitCluster(LogicalUnit):
479 """Initialise the cluster.
482 HPATH = "cluster-init"
483 HTYPE = constants.HTYPE_CLUSTER
484 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
485 "def_bridge", "master_netdev"]
488 def BuildHooksEnv(self):
491 Notes: Since we don't require a cluster, we must manually add
492 ourselves in the post-run node list.
495 env = {"OP_TARGET": self.op.cluster_name}
496 return env, [], [self.hostname.name]
498 def CheckPrereq(self):
499 """Verify that the passed name is a valid one.
502 if config.ConfigWriter.IsCluster():
503 raise errors.OpPrereqError("Cluster is already initialised")
505 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
506 if not os.path.exists(constants.VNC_PASSWORD_FILE):
507 raise errors.OpPrereqError("Please prepare the cluster VNC"
509 constants.VNC_PASSWORD_FILE)
511 self.hostname = hostname = utils.HostInfo()
513 if hostname.ip.startswith("127."):
514 raise errors.OpPrereqError("This host's IP resolves to the private"
515 " range (%s). Please fix DNS or %s." %
516 (hostname.ip, constants.ETC_HOSTS))
518 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
519 source=constants.LOCALHOST_IP_ADDRESS):
520 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521 " to %s,\nbut this ip address does not"
522 " belong to this host."
523 " Aborting." % hostname.ip)
525 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
527 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
529 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
531 secondary_ip = getattr(self.op, "secondary_ip", None)
532 if secondary_ip and not utils.IsValidIP(secondary_ip):
533 raise errors.OpPrereqError("Invalid secondary ip given")
535 secondary_ip != hostname.ip and
536 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
537 source=constants.LOCALHOST_IP_ADDRESS))):
538 raise errors.OpPrereqError("You gave %s as secondary IP,"
539 " but it does not belong to this host." %
541 self.secondary_ip = secondary_ip
543 # checks presence of the volume group given
544 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
547 raise errors.OpPrereqError("Error: %s" % vgstatus)
549 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
551 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
554 if self.op.hypervisor_type not in constants.HYPER_TYPES:
555 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
556 self.op.hypervisor_type)
558 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
560 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
561 (self.op.master_netdev,
562 result.output.strip()))
564 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
565 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
566 raise errors.OpPrereqError("Init.d script '%s' missing or not"
567 " executable." % constants.NODE_INITD_SCRIPT)
569 def Exec(self, feedback_fn):
570 """Initialize the cluster.
573 clustername = self.clustername
574 hostname = self.hostname
576 # set up the simple store
577 self.sstore = ss = ssconf.SimpleStore()
578 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
579 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
580 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
581 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
582 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
584 # set up the inter-node password and certificate
585 _InitGanetiServerSetup(ss)
587 # start the master ip
588 rpc.call_node_start_master(hostname.name)
590 # set up ssh config and /etc/hosts
591 f = open(constants.SSH_HOST_RSA_PUB, 'r')
596 sshkey = sshline.split(" ")[1]
598 _AddHostToEtcHosts(hostname.name)
600 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
602 _InitSSHSetup(hostname.name)
604 # init of cluster config file
605 self.cfg = cfgw = config.ConfigWriter()
606 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
607 sshkey, self.op.mac_prefix,
608 self.op.vg_name, self.op.def_bridge)
611 class LUDestroyCluster(NoHooksLU):
612 """Logical unit for destroying the cluster.
617 def CheckPrereq(self):
618 """Check prerequisites.
620 This checks whether the cluster is empty.
622 Any errors are signalled by raising errors.OpPrereqError.
625 master = self.sstore.GetMasterNode()
627 nodelist = self.cfg.GetNodeList()
628 if len(nodelist) != 1 or nodelist[0] != master:
629 raise errors.OpPrereqError("There are still %d node(s) in"
630 " this cluster." % (len(nodelist) - 1))
631 instancelist = self.cfg.GetInstanceList()
633 raise errors.OpPrereqError("There are still %d instance(s) in"
634 " this cluster." % len(instancelist))
636 def Exec(self, feedback_fn):
637 """Destroys the cluster.
640 master = self.sstore.GetMasterNode()
641 if not rpc.call_node_stop_master(master):
642 raise errors.OpExecError("Could not disable the master role")
643 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
644 utils.CreateBackup(priv_key)
645 utils.CreateBackup(pub_key)
646 rpc.call_node_leave_cluster(master)
649 class LUVerifyCluster(NoHooksLU):
650 """Verifies the cluster status.
653 _OP_REQP = ["skip_checks"]
655 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
656 remote_version, feedback_fn):
657 """Run multiple tests against a node.
660 - compares ganeti version
661 - checks vg existance and size > 20G
662 - checks config file checksum
663 - checks ssh to other nodes
666 node: name of the node to check
667 file_list: required list of files
668 local_cksum: dictionary of local files and their checksums
671 # compares ganeti version
672 local_version = constants.PROTOCOL_VERSION
673 if not remote_version:
674 feedback_fn(" - ERROR: connection to %s failed" % (node))
677 if local_version != remote_version:
678 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
679 (local_version, node, remote_version))
682 # checks vg existance and size > 20G
686 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
690 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
692 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
695 # checks config file checksum
698 if 'filelist' not in node_result:
700 feedback_fn(" - ERROR: node hasn't returned file checksum data")
702 remote_cksum = node_result['filelist']
703 for file_name in file_list:
704 if file_name not in remote_cksum:
706 feedback_fn(" - ERROR: file '%s' missing" % file_name)
707 elif remote_cksum[file_name] != local_cksum[file_name]:
709 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
711 if 'nodelist' not in node_result:
713 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
715 if node_result['nodelist']:
717 for node in node_result['nodelist']:
718 feedback_fn(" - ERROR: communication with node '%s': %s" %
719 (node, node_result['nodelist'][node]))
720 hyp_result = node_result.get('hypervisor', None)
721 if hyp_result is not None:
722 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
725 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
726 node_instance, feedback_fn):
727 """Verify an instance.
729 This function checks to see if the required block devices are
730 available on the instance's node.
735 node_current = instanceconfig.primary_node
738 instanceconfig.MapLVsByNode(node_vol_should)
740 for node in node_vol_should:
741 for volume in node_vol_should[node]:
742 if node not in node_vol_is or volume not in node_vol_is[node]:
743 feedback_fn(" - ERROR: volume %s missing on node %s" %
747 if not instanceconfig.status == 'down':
748 if (node_current not in node_instance or
749 not instance in node_instance[node_current]):
750 feedback_fn(" - ERROR: instance %s not running on node %s" %
751 (instance, node_current))
754 for node in node_instance:
755 if (not node == node_current):
756 if instance in node_instance[node]:
757 feedback_fn(" - ERROR: instance %s should not run on node %s" %
763 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764 """Verify if there are any unknown volumes in the cluster.
766 The .os, .swap and backup volumes are ignored. All other volumes are
772 for node in node_vol_is:
773 for volume in node_vol_is[node]:
774 if node not in node_vol_should or volume not in node_vol_should[node]:
775 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
780 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781 """Verify the list of running instances.
783 This checks what instances are running but unknown to the cluster.
787 for node in node_instance:
788 for runninginstance in node_instance[node]:
789 if runninginstance not in instancelist:
790 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
791 (runninginstance, node))
795 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
796 """Verify N+1 Memory Resilience.
798 Check that if one single node dies we can still start all the instances it
804 for node, nodeinfo in node_info.iteritems():
805 # This code checks that every node which is now listed as secondary has
806 # enough memory to host all instances it is supposed to should a single
807 # other node in the cluster fail.
808 # FIXME: not ready for failover to an arbitrary node
809 # FIXME: does not support file-backed instances
810 # WARNING: we currently take into account down instances as well as up
811 # ones, considering that even if they're down someone might want to start
812 # them even in the event of a node failure.
813 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
815 for instance in instances:
816 needed_mem += instance_cfg[instance].memory
817 if nodeinfo['mfree'] < needed_mem:
818 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
819 " failovers should node %s fail" % (node, prinode))
823 def CheckPrereq(self):
824 """Check prerequisites.
826 Transform the list of checks we're going to skip into a set and check that
827 all its members are valid.
830 self.skip_set = frozenset(self.op.skip_checks)
831 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
832 raise errors.OpPrereqError("Invalid checks to be skipped specified")
834 def Exec(self, feedback_fn):
835 """Verify integrity of cluster, performing various test on nodes.
839 feedback_fn("* Verifying global settings")
840 for msg in self.cfg.VerifyConfig():
841 feedback_fn(" - ERROR: %s" % msg)
843 vg_name = self.cfg.GetVGName()
844 nodelist = utils.NiceSort(self.cfg.GetNodeList())
845 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
846 i_non_redundant = [] # Non redundant instances
852 # FIXME: verify OS list
854 file_names = list(self.sstore.GetFileList())
855 file_names.append(constants.SSL_CERT_FILE)
856 file_names.append(constants.CLUSTER_CONF_FILE)
857 local_checksums = utils.FingerprintFiles(file_names)
859 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
860 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
861 all_instanceinfo = rpc.call_instance_list(nodelist)
862 all_vglist = rpc.call_vg_list(nodelist)
863 node_verify_param = {
864 'filelist': file_names,
865 'nodelist': nodelist,
868 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
869 all_rversion = rpc.call_version(nodelist)
870 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
872 for node in nodelist:
873 feedback_fn("* Verifying node %s" % node)
874 result = self._VerifyNode(node, file_names, local_checksums,
875 all_vglist[node], all_nvinfo[node],
876 all_rversion[node], feedback_fn)
880 volumeinfo = all_volumeinfo[node]
882 if isinstance(volumeinfo, basestring):
883 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
884 (node, volumeinfo[-400:].encode('string_escape')))
886 node_volume[node] = {}
887 elif not isinstance(volumeinfo, dict):
888 feedback_fn(" - ERROR: connection to %s failed" % (node,))
892 node_volume[node] = volumeinfo
895 nodeinstance = all_instanceinfo[node]
896 if type(nodeinstance) != list:
897 feedback_fn(" - ERROR: connection to %s failed" % (node,))
901 node_instance[node] = nodeinstance
904 nodeinfo = all_ninfo[node]
905 if not isinstance(nodeinfo, dict):
906 feedback_fn(" - ERROR: connection to %s failed" % (node,))
912 "mfree": int(nodeinfo['memory_free']),
913 "dfree": int(nodeinfo['vg_free']),
916 # dictionary holding all instances this node is secondary for,
917 # grouped by their primary node. Each key is a cluster node, and each
918 # value is a list of instances which have the key as primary and the
919 # current node as secondary. this is handy to calculate N+1 memory
920 # availability if you can only failover from a primary to its
922 "sinst-by-pnode": {},
925 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
931 for instance in instancelist:
932 feedback_fn("* Verifying instance %s" % instance)
933 inst_config = self.cfg.GetInstanceInfo(instance)
934 result = self._VerifyInstance(instance, inst_config, node_volume,
935 node_instance, feedback_fn)
938 inst_config.MapLVsByNode(node_vol_should)
940 instance_cfg[instance] = inst_config
942 pnode = inst_config.primary_node
943 if pnode in node_info:
944 node_info[pnode]['pinst'].append(instance)
946 feedback_fn(" - ERROR: instance %s, connection to primary node"
947 " %s failed" % (instance, pnode))
950 # If the instance is non-redundant we cannot survive losing its primary
951 # node, so we are not N+1 compliant. On the other hand we have no disk
952 # templates with more than one secondary so that situation is not well
954 # FIXME: does not support file-backed instances
955 if len(inst_config.secondary_nodes) == 0:
956 i_non_redundant.append(instance)
957 elif len(inst_config.secondary_nodes) > 1:
958 feedback_fn(" - WARNING: multiple secondaries for instance %s"
961 for snode in inst_config.secondary_nodes:
962 if snode in node_info:
963 node_info[snode]['sinst'].append(instance)
964 if pnode not in node_info[snode]['sinst-by-pnode']:
965 node_info[snode]['sinst-by-pnode'][pnode] = []
966 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
968 feedback_fn(" - ERROR: instance %s, connection to secondary node"
969 " %s failed" % (instance, snode))
971 feedback_fn("* Verifying orphan volumes")
972 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
976 feedback_fn("* Verifying remaining instances")
977 result = self._VerifyOrphanInstances(instancelist, node_instance,
981 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
982 feedback_fn("* Verifying N+1 Memory redundancy")
983 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
986 feedback_fn("* Other Notes")
988 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
989 % len(i_non_redundant))
994 class LUVerifyDisks(NoHooksLU):
995 """Verifies the cluster disks status.
1000 def CheckPrereq(self):
1001 """Check prerequisites.
1003 This has no prerequisites.
1008 def Exec(self, feedback_fn):
1009 """Verify integrity of cluster disks.
1012 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1014 vg_name = self.cfg.GetVGName()
1015 nodes = utils.NiceSort(self.cfg.GetNodeList())
1016 instances = [self.cfg.GetInstanceInfo(name)
1017 for name in self.cfg.GetInstanceList()]
1020 for inst in instances:
1022 if (inst.status != "up" or
1023 inst.disk_template not in constants.DTS_NET_MIRROR):
1025 inst.MapLVsByNode(inst_lvs)
1026 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1027 for node, vol_list in inst_lvs.iteritems():
1028 for vol in vol_list:
1029 nv_dict[(node, vol)] = inst
1034 node_lvs = rpc.call_volume_list(nodes, vg_name)
1039 lvs = node_lvs[node]
1041 if isinstance(lvs, basestring):
1042 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1043 res_nlvm[node] = lvs
1044 elif not isinstance(lvs, dict):
1045 logger.Info("connection to node %s failed or invalid data returned" %
1047 res_nodes.append(node)
1050 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1051 inst = nv_dict.pop((node, lv_name), None)
1052 if (not lv_online and inst is not None
1053 and inst.name not in res_instances):
1054 res_instances.append(inst.name)
1056 # any leftover items in nv_dict are missing LVs, let's arrange the
1058 for key, inst in nv_dict.iteritems():
1059 if inst.name not in res_missing:
1060 res_missing[inst.name] = []
1061 res_missing[inst.name].append(key)
1066 class LURenameCluster(LogicalUnit):
1067 """Rename the cluster.
1070 HPATH = "cluster-rename"
1071 HTYPE = constants.HTYPE_CLUSTER
1074 def BuildHooksEnv(self):
1079 "OP_TARGET": self.sstore.GetClusterName(),
1080 "NEW_NAME": self.op.name,
1082 mn = self.sstore.GetMasterNode()
1083 return env, [mn], [mn]
1085 def CheckPrereq(self):
1086 """Verify that the passed name is a valid one.
1089 hostname = utils.HostInfo(self.op.name)
1091 new_name = hostname.name
1092 self.ip = new_ip = hostname.ip
1093 old_name = self.sstore.GetClusterName()
1094 old_ip = self.sstore.GetMasterIP()
1095 if new_name == old_name and new_ip == old_ip:
1096 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1097 " cluster has changed")
1098 if new_ip != old_ip:
1099 result = utils.RunCmd(["fping", "-q", new_ip])
1100 if not result.failed:
1101 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102 " reachable on the network. Aborting." %
1105 self.op.name = new_name
1107 def Exec(self, feedback_fn):
1108 """Rename the cluster.
1111 clustername = self.op.name
1115 # shutdown the master IP
1116 master = ss.GetMasterNode()
1117 if not rpc.call_node_stop_master(master):
1118 raise errors.OpExecError("Could not disable the master role")
1122 ss.SetKey(ss.SS_MASTER_IP, ip)
1123 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1125 # Distribute updated ss config to all nodes
1126 myself = self.cfg.GetNodeInfo(master)
1127 dist_nodes = self.cfg.GetNodeList()
1128 if myself.name in dist_nodes:
1129 dist_nodes.remove(myself.name)
1131 logger.Debug("Copying updated ssconf data to all nodes")
1132 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133 fname = ss.KeyToFilename(keyname)
1134 result = rpc.call_upload_file(dist_nodes, fname)
1135 for to_node in dist_nodes:
1136 if not result[to_node]:
1137 logger.Error("copy of file %s to node %s failed" %
1140 if not rpc.call_node_start_master(master):
1141 logger.Error("Could not re-enable the master role on the master,"
1142 " please restart manually.")
1145 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1146 """Sleep and poll for an instance's disk to sync.
1149 if not instance.disks:
1153 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1155 node = instance.primary_node
1157 for dev in instance.disks:
1158 cfgw.SetDiskID(dev, node)
1164 cumul_degraded = False
1165 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1167 proc.LogWarning("Can't get any data from node %s" % node)
1170 raise errors.RemoteError("Can't contact node %s for mirror data,"
1171 " aborting." % node)
1175 for i in range(len(rstats)):
1178 proc.LogWarning("Can't compute data for node %s/%s" %
1179 (node, instance.disks[i].iv_name))
1181 # we ignore the ldisk parameter
1182 perc_done, est_time, is_degraded, _ = mstat
1183 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1184 if perc_done is not None:
1186 if est_time is not None:
1187 rem_time = "%d estimated seconds remaining" % est_time
1190 rem_time = "no time estimate"
1191 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1192 (instance.disks[i].iv_name, perc_done, rem_time))
1199 time.sleep(min(60, max_time))
1205 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1206 return not cumul_degraded
1209 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1210 """Check that mirrors are not degraded.
1212 The ldisk parameter, if True, will change the test from the
1213 is_degraded attribute (which represents overall non-ok status for
1214 the device(s)) to the ldisk (representing the local storage status).
1217 cfgw.SetDiskID(dev, node)
1224 if on_primary or dev.AssembleOnSecondary():
1225 rstats = rpc.call_blockdev_find(node, dev)
1227 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1230 result = result and (not rstats[idx])
1232 for child in dev.children:
1233 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1238 class LUDiagnoseOS(NoHooksLU):
1239 """Logical unit for OS diagnose/query.
1244 def CheckPrereq(self):
1245 """Check prerequisites.
1247 This always succeeds, since this is a pure query LU.
1252 def Exec(self, feedback_fn):
1253 """Compute the list of OSes.
1256 node_list = self.cfg.GetNodeList()
1257 node_data = rpc.call_os_diagnose(node_list)
1258 if node_data == False:
1259 raise errors.OpExecError("Can't gather the list of OSes")
1263 class LURemoveNode(LogicalUnit):
1264 """Logical unit for removing a node.
1267 HPATH = "node-remove"
1268 HTYPE = constants.HTYPE_NODE
1269 _OP_REQP = ["node_name"]
1271 def BuildHooksEnv(self):
1274 This doesn't run on the target node in the pre phase as a failed
1275 node would not allows itself to run.
1279 "OP_TARGET": self.op.node_name,
1280 "NODE_NAME": self.op.node_name,
1282 all_nodes = self.cfg.GetNodeList()
1283 all_nodes.remove(self.op.node_name)
1284 return env, all_nodes, all_nodes
1286 def CheckPrereq(self):
1287 """Check prerequisites.
1290 - the node exists in the configuration
1291 - it does not have primary or secondary instances
1292 - it's not the master
1294 Any errors are signalled by raising errors.OpPrereqError.
1297 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1299 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1301 instance_list = self.cfg.GetInstanceList()
1303 masternode = self.sstore.GetMasterNode()
1304 if node.name == masternode:
1305 raise errors.OpPrereqError("Node is the master node,"
1306 " you need to failover first.")
1308 for instance_name in instance_list:
1309 instance = self.cfg.GetInstanceInfo(instance_name)
1310 if node.name == instance.primary_node:
1311 raise errors.OpPrereqError("Instance %s still running on the node,"
1312 " please remove first." % instance_name)
1313 if node.name in instance.secondary_nodes:
1314 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1315 " please remove first." % instance_name)
1316 self.op.node_name = node.name
1319 def Exec(self, feedback_fn):
1320 """Removes the node from the cluster.
1324 logger.Info("stopping the node daemon and removing configs from node %s" %
1327 rpc.call_node_leave_cluster(node.name)
1329 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1331 logger.Info("Removing node %s from config" % node.name)
1333 self.cfg.RemoveNode(node.name)
1335 _RemoveHostFromEtcHosts(node.name)
1338 class LUQueryNodes(NoHooksLU):
1339 """Logical unit for querying nodes.
1342 _OP_REQP = ["output_fields", "names"]
1344 def CheckPrereq(self):
1345 """Check prerequisites.
1347 This checks that the fields required are valid output fields.
1350 self.dynamic_fields = frozenset(["dtotal", "dfree",
1351 "mtotal", "mnode", "mfree",
1354 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1355 "pinst_list", "sinst_list",
1357 dynamic=self.dynamic_fields,
1358 selected=self.op.output_fields)
1360 self.wanted = _GetWantedNodes(self, self.op.names)
1362 def Exec(self, feedback_fn):
1363 """Computes the list of nodes and their attributes.
1366 nodenames = self.wanted
1367 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1369 # begin data gathering
1371 if self.dynamic_fields.intersection(self.op.output_fields):
1373 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1374 for name in nodenames:
1375 nodeinfo = node_data.get(name, None)
1378 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1379 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1380 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1381 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1382 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1383 "bootid": nodeinfo['bootid'],
1386 live_data[name] = {}
1388 live_data = dict.fromkeys(nodenames, {})
1390 node_to_primary = dict([(name, set()) for name in nodenames])
1391 node_to_secondary = dict([(name, set()) for name in nodenames])
1393 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1394 "sinst_cnt", "sinst_list"))
1395 if inst_fields & frozenset(self.op.output_fields):
1396 instancelist = self.cfg.GetInstanceList()
1398 for instance_name in instancelist:
1399 inst = self.cfg.GetInstanceInfo(instance_name)
1400 if inst.primary_node in node_to_primary:
1401 node_to_primary[inst.primary_node].add(inst.name)
1402 for secnode in inst.secondary_nodes:
1403 if secnode in node_to_secondary:
1404 node_to_secondary[secnode].add(inst.name)
1406 # end data gathering
1409 for node in nodelist:
1411 for field in self.op.output_fields:
1414 elif field == "pinst_list":
1415 val = list(node_to_primary[node.name])
1416 elif field == "sinst_list":
1417 val = list(node_to_secondary[node.name])
1418 elif field == "pinst_cnt":
1419 val = len(node_to_primary[node.name])
1420 elif field == "sinst_cnt":
1421 val = len(node_to_secondary[node.name])
1422 elif field == "pip":
1423 val = node.primary_ip
1424 elif field == "sip":
1425 val = node.secondary_ip
1426 elif field in self.dynamic_fields:
1427 val = live_data[node.name].get(field, None)
1429 raise errors.ParameterError(field)
1430 node_output.append(val)
1431 output.append(node_output)
1436 class LUQueryNodeVolumes(NoHooksLU):
1437 """Logical unit for getting volumes on node(s).
1440 _OP_REQP = ["nodes", "output_fields"]
1442 def CheckPrereq(self):
1443 """Check prerequisites.
1445 This checks that the fields required are valid output fields.
1448 self.nodes = _GetWantedNodes(self, self.op.nodes)
1450 _CheckOutputFields(static=["node"],
1451 dynamic=["phys", "vg", "name", "size", "instance"],
1452 selected=self.op.output_fields)
1455 def Exec(self, feedback_fn):
1456 """Computes the list of nodes and their attributes.
1459 nodenames = self.nodes
1460 volumes = rpc.call_node_volumes(nodenames)
1462 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1463 in self.cfg.GetInstanceList()]
1465 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1468 for node in nodenames:
1469 if node not in volumes or not volumes[node]:
1472 node_vols = volumes[node][:]
1473 node_vols.sort(key=lambda vol: vol['dev'])
1475 for vol in node_vols:
1477 for field in self.op.output_fields:
1480 elif field == "phys":
1484 elif field == "name":
1486 elif field == "size":
1487 val = int(float(vol['size']))
1488 elif field == "instance":
1490 if node not in lv_by_node[inst]:
1492 if vol['name'] in lv_by_node[inst][node]:
1498 raise errors.ParameterError(field)
1499 node_output.append(str(val))
1501 output.append(node_output)
1506 class LUAddNode(LogicalUnit):
1507 """Logical unit for adding node to the cluster.
1511 HTYPE = constants.HTYPE_NODE
1512 _OP_REQP = ["node_name"]
1514 def BuildHooksEnv(self):
1517 This will run on all nodes before, and on all nodes + the new node after.
1521 "OP_TARGET": self.op.node_name,
1522 "NODE_NAME": self.op.node_name,
1523 "NODE_PIP": self.op.primary_ip,
1524 "NODE_SIP": self.op.secondary_ip,
1526 nodes_0 = self.cfg.GetNodeList()
1527 nodes_1 = nodes_0 + [self.op.node_name, ]
1528 return env, nodes_0, nodes_1
1530 def CheckPrereq(self):
1531 """Check prerequisites.
1534 - the new node is not already in the config
1536 - its parameters (single/dual homed) matches the cluster
1538 Any errors are signalled by raising errors.OpPrereqError.
1541 node_name = self.op.node_name
1544 dns_data = utils.HostInfo(node_name)
1546 node = dns_data.name
1547 primary_ip = self.op.primary_ip = dns_data.ip
1548 secondary_ip = getattr(self.op, "secondary_ip", None)
1549 if secondary_ip is None:
1550 secondary_ip = primary_ip
1551 if not utils.IsValidIP(secondary_ip):
1552 raise errors.OpPrereqError("Invalid secondary IP given")
1553 self.op.secondary_ip = secondary_ip
1554 node_list = cfg.GetNodeList()
1555 if node in node_list:
1556 raise errors.OpPrereqError("Node %s is already in the configuration"
1559 for existing_node_name in node_list:
1560 existing_node = cfg.GetNodeInfo(existing_node_name)
1561 if (existing_node.primary_ip == primary_ip or
1562 existing_node.secondary_ip == primary_ip or
1563 existing_node.primary_ip == secondary_ip or
1564 existing_node.secondary_ip == secondary_ip):
1565 raise errors.OpPrereqError("New node ip address(es) conflict with"
1566 " existing node %s" % existing_node.name)
1568 # check that the type of the node (single versus dual homed) is the
1569 # same as for the master
1570 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1571 master_singlehomed = myself.secondary_ip == myself.primary_ip
1572 newbie_singlehomed = secondary_ip == primary_ip
1573 if master_singlehomed != newbie_singlehomed:
1574 if master_singlehomed:
1575 raise errors.OpPrereqError("The master has no private ip but the"
1576 " new node has one")
1578 raise errors.OpPrereqError("The master has a private ip but the"
1579 " new node doesn't have one")
1581 # checks reachablity
1582 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1583 raise errors.OpPrereqError("Node not reachable by ping")
1585 if not newbie_singlehomed:
1586 # check reachability from my secondary ip to newbie's secondary ip
1587 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1588 source=myself.secondary_ip):
1589 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1590 " based ping to noded port")
1592 self.new_node = objects.Node(name=node,
1593 primary_ip=primary_ip,
1594 secondary_ip=secondary_ip)
1596 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1597 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1598 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1599 constants.VNC_PASSWORD_FILE)
1601 def Exec(self, feedback_fn):
1602 """Adds the new node to the cluster.
1605 new_node = self.new_node
1606 node = new_node.name
1608 # set up inter-node password and certificate and restarts the node daemon
1609 gntpass = self.sstore.GetNodeDaemonPassword()
1610 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1611 raise errors.OpExecError("ganeti password corruption detected")
1612 f = open(constants.SSL_CERT_FILE)
1614 gntpem = f.read(8192)
1617 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1618 # so we use this to detect an invalid certificate; as long as the
1619 # cert doesn't contain this, the here-document will be correctly
1620 # parsed by the shell sequence below
1621 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1622 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1623 if not gntpem.endswith("\n"):
1624 raise errors.OpExecError("PEM must end with newline")
1625 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1627 # and then connect with ssh to set password and start ganeti-noded
1628 # note that all the below variables are sanitized at this point,
1629 # either by being constants or by the checks above
1631 mycommand = ("umask 077 && "
1632 "echo '%s' > '%s' && "
1633 "cat > '%s' << '!EOF.' && \n"
1634 "%s!EOF.\n%s restart" %
1635 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1636 constants.SSL_CERT_FILE, gntpem,
1637 constants.NODE_INITD_SCRIPT))
1639 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1641 raise errors.OpExecError("Remote command on node %s, error: %s,"
1643 (node, result.fail_reason, result.output))
1645 # check connectivity
1648 result = rpc.call_version([node])[node]
1650 if constants.PROTOCOL_VERSION == result:
1651 logger.Info("communication to node %s fine, sw version %s match" %
1654 raise errors.OpExecError("Version mismatch master version %s,"
1655 " node version %s" %
1656 (constants.PROTOCOL_VERSION, result))
1658 raise errors.OpExecError("Cannot get version from the new node")
1661 logger.Info("copy ssh key to node %s" % node)
1662 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1664 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1665 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1671 keyarray.append(f.read())
1675 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1676 keyarray[3], keyarray[4], keyarray[5])
1679 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1681 # Add node to our /etc/hosts, and add key to known_hosts
1682 _AddHostToEtcHosts(new_node.name)
1684 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1685 self.cfg.GetHostKey())
1687 if new_node.secondary_ip != new_node.primary_ip:
1688 if not rpc.call_node_tcp_ping(new_node.name,
1689 constants.LOCALHOST_IP_ADDRESS,
1690 new_node.secondary_ip,
1691 constants.DEFAULT_NODED_PORT,
1693 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1694 " you gave (%s). Please fix and re-run this"
1695 " command." % new_node.secondary_ip)
1697 success, msg = ssh.VerifyNodeHostname(node)
1699 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1700 " than the one the resolver gives: %s."
1701 " Please fix and re-run this command." %
1704 # Distribute updated /etc/hosts and known_hosts to all nodes,
1705 # including the node just added
1706 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1707 dist_nodes = self.cfg.GetNodeList() + [node]
1708 if myself.name in dist_nodes:
1709 dist_nodes.remove(myself.name)
1711 logger.Debug("Copying hosts and known_hosts to all nodes")
1712 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1713 result = rpc.call_upload_file(dist_nodes, fname)
1714 for to_node in dist_nodes:
1715 if not result[to_node]:
1716 logger.Error("copy of file %s to node %s failed" %
1719 to_copy = ss.GetFileList()
1720 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1721 to_copy.append(constants.VNC_PASSWORD_FILE)
1722 for fname in to_copy:
1723 if not ssh.CopyFileToNode(node, fname):
1724 logger.Error("could not copy file %s to node %s" % (fname, node))
1726 logger.Info("adding node %s to cluster.conf" % node)
1727 self.cfg.AddNode(new_node)
1730 class LUMasterFailover(LogicalUnit):
1731 """Failover the master node to the current node.
1733 This is a special LU in that it must run on a non-master node.
1736 HPATH = "master-failover"
1737 HTYPE = constants.HTYPE_CLUSTER
1741 def BuildHooksEnv(self):
1744 This will run on the new master only in the pre phase, and on all
1745 the nodes in the post phase.
1749 "OP_TARGET": self.new_master,
1750 "NEW_MASTER": self.new_master,
1751 "OLD_MASTER": self.old_master,
1753 return env, [self.new_master], self.cfg.GetNodeList()
1755 def CheckPrereq(self):
1756 """Check prerequisites.
1758 This checks that we are not already the master.
1761 self.new_master = utils.HostInfo().name
1762 self.old_master = self.sstore.GetMasterNode()
1764 if self.old_master == self.new_master:
1765 raise errors.OpPrereqError("This commands must be run on the node"
1766 " where you want the new master to be."
1767 " %s is already the master" %
1770 def Exec(self, feedback_fn):
1771 """Failover the master node.
1773 This command, when run on a non-master node, will cause the current
1774 master to cease being master, and the non-master to become new
1778 #TODO: do not rely on gethostname returning the FQDN
1779 logger.Info("setting master to %s, old master: %s" %
1780 (self.new_master, self.old_master))
1782 if not rpc.call_node_stop_master(self.old_master):
1783 logger.Error("could disable the master role on the old master"
1784 " %s, please disable manually" % self.old_master)
1787 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1788 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1789 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1790 logger.Error("could not distribute the new simple store master file"
1791 " to the other nodes, please check.")
1793 if not rpc.call_node_start_master(self.new_master):
1794 logger.Error("could not start the master role on the new master"
1795 " %s, please check" % self.new_master)
1796 feedback_fn("Error in activating the master IP on the new master,"
1797 " please fix manually.")
1801 class LUQueryClusterInfo(NoHooksLU):
1802 """Query cluster configuration.
1808 def CheckPrereq(self):
1809 """No prerequsites needed for this LU.
1814 def Exec(self, feedback_fn):
1815 """Return cluster config.
1819 "name": self.sstore.GetClusterName(),
1820 "software_version": constants.RELEASE_VERSION,
1821 "protocol_version": constants.PROTOCOL_VERSION,
1822 "config_version": constants.CONFIG_VERSION,
1823 "os_api_version": constants.OS_API_VERSION,
1824 "export_version": constants.EXPORT_VERSION,
1825 "master": self.sstore.GetMasterNode(),
1826 "architecture": (platform.architecture()[0], platform.machine()),
1832 class LUClusterCopyFile(NoHooksLU):
1833 """Copy file to cluster.
1836 _OP_REQP = ["nodes", "filename"]
1838 def CheckPrereq(self):
1839 """Check prerequisites.
1841 It should check that the named file exists and that the given list
1845 if not os.path.exists(self.op.filename):
1846 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1848 self.nodes = _GetWantedNodes(self, self.op.nodes)
1850 def Exec(self, feedback_fn):
1851 """Copy a file from master to some nodes.
1854 opts - class with options as members
1855 args - list containing a single element, the file name
1857 nodes - list containing the name of target nodes; if empty, all nodes
1860 filename = self.op.filename
1862 myname = utils.HostInfo().name
1864 for node in self.nodes:
1867 if not ssh.CopyFileToNode(node, filename):
1868 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1871 class LUDumpClusterConfig(NoHooksLU):
1872 """Return a text-representation of the cluster-config.
1877 def CheckPrereq(self):
1878 """No prerequisites.
1883 def Exec(self, feedback_fn):
1884 """Dump a representation of the cluster config to the standard output.
1887 return self.cfg.DumpConfig()
1890 class LURunClusterCommand(NoHooksLU):
1891 """Run a command on some nodes.
1894 _OP_REQP = ["command", "nodes"]
1896 def CheckPrereq(self):
1897 """Check prerequisites.
1899 It checks that the given list of nodes is valid.
1902 self.nodes = _GetWantedNodes(self, self.op.nodes)
1904 def Exec(self, feedback_fn):
1905 """Run a command on some nodes.
1908 # put the master at the end of the nodes list
1909 master_node = self.sstore.GetMasterNode()
1910 if master_node in self.nodes:
1911 self.nodes.remove(master_node)
1912 self.nodes.append(master_node)
1915 for node in self.nodes:
1916 result = ssh.SSHCall(node, "root", self.op.command)
1917 data.append((node, result.output, result.exit_code))
1922 class LUActivateInstanceDisks(NoHooksLU):
1923 """Bring up an instance's disks.
1926 _OP_REQP = ["instance_name"]
1928 def CheckPrereq(self):
1929 """Check prerequisites.
1931 This checks that the instance is in the cluster.
1934 instance = self.cfg.GetInstanceInfo(
1935 self.cfg.ExpandInstanceName(self.op.instance_name))
1936 if instance is None:
1937 raise errors.OpPrereqError("Instance '%s' not known" %
1938 self.op.instance_name)
1939 self.instance = instance
1942 def Exec(self, feedback_fn):
1943 """Activate the disks.
1946 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1948 raise errors.OpExecError("Cannot activate block devices")
1953 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1954 """Prepare the block devices for an instance.
1956 This sets up the block devices on all nodes.
1959 instance: a ganeti.objects.Instance object
1960 ignore_secondaries: if true, errors on secondary nodes won't result
1961 in an error return from the function
1964 false if the operation failed
1965 list of (host, instance_visible_name, node_visible_name) if the operation
1966 suceeded with the mapping from node devices to instance devices
1970 iname = instance.name
1971 # With the two passes mechanism we try to reduce the window of
1972 # opportunity for the race condition of switching DRBD to primary
1973 # before handshaking occured, but we do not eliminate it
1975 # The proper fix would be to wait (with some limits) until the
1976 # connection has been made and drbd transitions from WFConnection
1977 # into any other network-connected state (Connected, SyncTarget,
1980 # 1st pass, assemble on all nodes in secondary mode
1981 for inst_disk in instance.disks:
1982 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1983 cfg.SetDiskID(node_disk, node)
1984 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1986 logger.Error("could not prepare block device %s on node %s"
1987 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1988 if not ignore_secondaries:
1991 # FIXME: race condition on drbd migration to primary
1993 # 2nd pass, do only the primary node
1994 for inst_disk in instance.disks:
1995 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1996 if node != instance.primary_node:
1998 cfg.SetDiskID(node_disk, node)
1999 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2001 logger.Error("could not prepare block device %s on node %s"
2002 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2004 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2006 # leave the disks configured for the primary node
2007 # this is a workaround that would be fixed better by
2008 # improving the logical/physical id handling
2009 for disk in instance.disks:
2010 cfg.SetDiskID(disk, instance.primary_node)
2012 return disks_ok, device_info
2015 def _StartInstanceDisks(cfg, instance, force):
2016 """Start the disks of an instance.
2019 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2020 ignore_secondaries=force)
2022 _ShutdownInstanceDisks(instance, cfg)
2023 if force is not None and not force:
2024 logger.Error("If the message above refers to a secondary node,"
2025 " you can retry the operation using '--force'.")
2026 raise errors.OpExecError("Disk consistency error")
2029 class LUDeactivateInstanceDisks(NoHooksLU):
2030 """Shutdown an instance's disks.
2033 _OP_REQP = ["instance_name"]
2035 def CheckPrereq(self):
2036 """Check prerequisites.
2038 This checks that the instance is in the cluster.
2041 instance = self.cfg.GetInstanceInfo(
2042 self.cfg.ExpandInstanceName(self.op.instance_name))
2043 if instance is None:
2044 raise errors.OpPrereqError("Instance '%s' not known" %
2045 self.op.instance_name)
2046 self.instance = instance
2048 def Exec(self, feedback_fn):
2049 """Deactivate the disks
2052 instance = self.instance
2053 ins_l = rpc.call_instance_list([instance.primary_node])
2054 ins_l = ins_l[instance.primary_node]
2055 if not type(ins_l) is list:
2056 raise errors.OpExecError("Can't contact node '%s'" %
2057 instance.primary_node)
2059 if self.instance.name in ins_l:
2060 raise errors.OpExecError("Instance is running, can't shutdown"
2063 _ShutdownInstanceDisks(instance, self.cfg)
2066 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2067 """Shutdown block devices of an instance.
2069 This does the shutdown on all nodes of the instance.
2071 If the ignore_primary is false, errors on the primary node are
2076 for disk in instance.disks:
2077 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2078 cfg.SetDiskID(top_disk, node)
2079 if not rpc.call_blockdev_shutdown(node, top_disk):
2080 logger.Error("could not shutdown block device %s on node %s" %
2081 (disk.iv_name, node))
2082 if not ignore_primary or node != instance.primary_node:
2087 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2088 """Checks if a node has enough free memory.
2090 This function check if a given node has the needed amount of free
2091 memory. In case the node has less memory or we cannot get the
2092 information from the node, this function raise an OpPrereqError
2096 - cfg: a ConfigWriter instance
2097 - node: the node name
2098 - reason: string to use in the error message
2099 - requested: the amount of memory in MiB
2102 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2103 if not nodeinfo or not isinstance(nodeinfo, dict):
2104 raise errors.OpPrereqError("Could not contact node %s for resource"
2105 " information" % (node,))
2107 free_mem = nodeinfo[node].get('memory_free')
2108 if not isinstance(free_mem, int):
2109 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2110 " was '%s'" % (node, free_mem))
2111 if requested > free_mem:
2112 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2113 " needed %s MiB, available %s MiB" %
2114 (node, reason, requested, free_mem))
2117 class LUStartupInstance(LogicalUnit):
2118 """Starts an instance.
2121 HPATH = "instance-start"
2122 HTYPE = constants.HTYPE_INSTANCE
2123 _OP_REQP = ["instance_name", "force"]
2125 def BuildHooksEnv(self):
2128 This runs on master, primary and secondary nodes of the instance.
2132 "FORCE": self.op.force,
2134 env.update(_BuildInstanceHookEnvByObject(self.instance))
2135 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2136 list(self.instance.secondary_nodes))
2139 def CheckPrereq(self):
2140 """Check prerequisites.
2142 This checks that the instance is in the cluster.
2145 instance = self.cfg.GetInstanceInfo(
2146 self.cfg.ExpandInstanceName(self.op.instance_name))
2147 if instance is None:
2148 raise errors.OpPrereqError("Instance '%s' not known" %
2149 self.op.instance_name)
2151 # check bridges existance
2152 _CheckInstanceBridgesExist(instance)
2154 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2155 "starting instance %s" % instance.name,
2158 self.instance = instance
2159 self.op.instance_name = instance.name
2161 def Exec(self, feedback_fn):
2162 """Start the instance.
2165 instance = self.instance
2166 force = self.op.force
2167 extra_args = getattr(self.op, "extra_args", "")
2169 self.cfg.MarkInstanceUp(instance.name)
2171 node_current = instance.primary_node
2173 _StartInstanceDisks(self.cfg, instance, force)
2175 if not rpc.call_instance_start(node_current, instance, extra_args):
2176 _ShutdownInstanceDisks(instance, self.cfg)
2177 raise errors.OpExecError("Could not start instance")
2180 class LURebootInstance(LogicalUnit):
2181 """Reboot an instance.
2184 HPATH = "instance-reboot"
2185 HTYPE = constants.HTYPE_INSTANCE
2186 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2188 def BuildHooksEnv(self):
2191 This runs on master, primary and secondary nodes of the instance.
2195 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2197 env.update(_BuildInstanceHookEnvByObject(self.instance))
2198 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2199 list(self.instance.secondary_nodes))
2202 def CheckPrereq(self):
2203 """Check prerequisites.
2205 This checks that the instance is in the cluster.
2208 instance = self.cfg.GetInstanceInfo(
2209 self.cfg.ExpandInstanceName(self.op.instance_name))
2210 if instance is None:
2211 raise errors.OpPrereqError("Instance '%s' not known" %
2212 self.op.instance_name)
2214 # check bridges existance
2215 _CheckInstanceBridgesExist(instance)
2217 self.instance = instance
2218 self.op.instance_name = instance.name
2220 def Exec(self, feedback_fn):
2221 """Reboot the instance.
2224 instance = self.instance
2225 ignore_secondaries = self.op.ignore_secondaries
2226 reboot_type = self.op.reboot_type
2227 extra_args = getattr(self.op, "extra_args", "")
2229 node_current = instance.primary_node
2231 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2232 constants.INSTANCE_REBOOT_HARD,
2233 constants.INSTANCE_REBOOT_FULL]:
2234 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2235 (constants.INSTANCE_REBOOT_SOFT,
2236 constants.INSTANCE_REBOOT_HARD,
2237 constants.INSTANCE_REBOOT_FULL))
2239 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2240 constants.INSTANCE_REBOOT_HARD]:
2241 if not rpc.call_instance_reboot(node_current, instance,
2242 reboot_type, extra_args):
2243 raise errors.OpExecError("Could not reboot instance")
2245 if not rpc.call_instance_shutdown(node_current, instance):
2246 raise errors.OpExecError("could not shutdown instance for full reboot")
2247 _ShutdownInstanceDisks(instance, self.cfg)
2248 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2249 if not rpc.call_instance_start(node_current, instance, extra_args):
2250 _ShutdownInstanceDisks(instance, self.cfg)
2251 raise errors.OpExecError("Could not start instance for full reboot")
2253 self.cfg.MarkInstanceUp(instance.name)
2256 class LUShutdownInstance(LogicalUnit):
2257 """Shutdown an instance.
2260 HPATH = "instance-stop"
2261 HTYPE = constants.HTYPE_INSTANCE
2262 _OP_REQP = ["instance_name"]
2264 def BuildHooksEnv(self):
2267 This runs on master, primary and secondary nodes of the instance.
2270 env = _BuildInstanceHookEnvByObject(self.instance)
2271 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2272 list(self.instance.secondary_nodes))
2275 def CheckPrereq(self):
2276 """Check prerequisites.
2278 This checks that the instance is in the cluster.
2281 instance = self.cfg.GetInstanceInfo(
2282 self.cfg.ExpandInstanceName(self.op.instance_name))
2283 if instance is None:
2284 raise errors.OpPrereqError("Instance '%s' not known" %
2285 self.op.instance_name)
2286 self.instance = instance
2288 def Exec(self, feedback_fn):
2289 """Shutdown the instance.
2292 instance = self.instance
2293 node_current = instance.primary_node
2294 self.cfg.MarkInstanceDown(instance.name)
2295 if not rpc.call_instance_shutdown(node_current, instance):
2296 logger.Error("could not shutdown instance")
2298 _ShutdownInstanceDisks(instance, self.cfg)
2301 class LUReinstallInstance(LogicalUnit):
2302 """Reinstall an instance.
2305 HPATH = "instance-reinstall"
2306 HTYPE = constants.HTYPE_INSTANCE
2307 _OP_REQP = ["instance_name"]
2309 def BuildHooksEnv(self):
2312 This runs on master, primary and secondary nodes of the instance.
2315 env = _BuildInstanceHookEnvByObject(self.instance)
2316 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2317 list(self.instance.secondary_nodes))
2320 def CheckPrereq(self):
2321 """Check prerequisites.
2323 This checks that the instance is in the cluster and is not running.
2326 instance = self.cfg.GetInstanceInfo(
2327 self.cfg.ExpandInstanceName(self.op.instance_name))
2328 if instance is None:
2329 raise errors.OpPrereqError("Instance '%s' not known" %
2330 self.op.instance_name)
2331 if instance.disk_template == constants.DT_DISKLESS:
2332 raise errors.OpPrereqError("Instance '%s' has no disks" %
2333 self.op.instance_name)
2334 if instance.status != "down":
2335 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2336 self.op.instance_name)
2337 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2339 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2340 (self.op.instance_name,
2341 instance.primary_node))
2343 self.op.os_type = getattr(self.op, "os_type", None)
2344 if self.op.os_type is not None:
2346 pnode = self.cfg.GetNodeInfo(
2347 self.cfg.ExpandNodeName(instance.primary_node))
2349 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2351 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2353 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2354 " primary node" % self.op.os_type)
2356 self.instance = instance
2358 def Exec(self, feedback_fn):
2359 """Reinstall the instance.
2362 inst = self.instance
2364 if self.op.os_type is not None:
2365 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2366 inst.os = self.op.os_type
2367 self.cfg.AddInstance(inst)
2369 _StartInstanceDisks(self.cfg, inst, None)
2371 feedback_fn("Running the instance OS create scripts...")
2372 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2373 raise errors.OpExecError("Could not install OS for instance %s"
2375 (inst.name, inst.primary_node))
2377 _ShutdownInstanceDisks(inst, self.cfg)
2380 class LURenameInstance(LogicalUnit):
2381 """Rename an instance.
2384 HPATH = "instance-rename"
2385 HTYPE = constants.HTYPE_INSTANCE
2386 _OP_REQP = ["instance_name", "new_name"]
2388 def BuildHooksEnv(self):
2391 This runs on master, primary and secondary nodes of the instance.
2394 env = _BuildInstanceHookEnvByObject(self.instance)
2395 env["INSTANCE_NEW_NAME"] = self.op.new_name
2396 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2397 list(self.instance.secondary_nodes))
2400 def CheckPrereq(self):
2401 """Check prerequisites.
2403 This checks that the instance is in the cluster and is not running.
2406 instance = self.cfg.GetInstanceInfo(
2407 self.cfg.ExpandInstanceName(self.op.instance_name))
2408 if instance is None:
2409 raise errors.OpPrereqError("Instance '%s' not known" %
2410 self.op.instance_name)
2411 if instance.status != "down":
2412 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2413 self.op.instance_name)
2414 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2416 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2417 (self.op.instance_name,
2418 instance.primary_node))
2419 self.instance = instance
2421 # new name verification
2422 name_info = utils.HostInfo(self.op.new_name)
2424 self.op.new_name = new_name = name_info.name
2425 instance_list = self.cfg.GetInstanceList()
2426 if new_name in instance_list:
2427 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2430 if not getattr(self.op, "ignore_ip", False):
2431 command = ["fping", "-q", name_info.ip]
2432 result = utils.RunCmd(command)
2433 if not result.failed:
2434 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2435 (name_info.ip, new_name))
2438 def Exec(self, feedback_fn):
2439 """Reinstall the instance.
2442 inst = self.instance
2443 old_name = inst.name
2445 self.cfg.RenameInstance(inst.name, self.op.new_name)
2447 # re-read the instance from the configuration after rename
2448 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2450 _StartInstanceDisks(self.cfg, inst, None)
2452 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2454 msg = ("Could run OS rename script for instance %s on node %s (but the"
2455 " instance has been renamed in Ganeti)" %
2456 (inst.name, inst.primary_node))
2459 _ShutdownInstanceDisks(inst, self.cfg)
2462 class LURemoveInstance(LogicalUnit):
2463 """Remove an instance.
2466 HPATH = "instance-remove"
2467 HTYPE = constants.HTYPE_INSTANCE
2468 _OP_REQP = ["instance_name"]
2470 def BuildHooksEnv(self):
2473 This runs on master, primary and secondary nodes of the instance.
2476 env = _BuildInstanceHookEnvByObject(self.instance)
2477 nl = [self.sstore.GetMasterNode()]
2480 def CheckPrereq(self):
2481 """Check prerequisites.
2483 This checks that the instance is in the cluster.
2486 instance = self.cfg.GetInstanceInfo(
2487 self.cfg.ExpandInstanceName(self.op.instance_name))
2488 if instance is None:
2489 raise errors.OpPrereqError("Instance '%s' not known" %
2490 self.op.instance_name)
2491 self.instance = instance
2493 def Exec(self, feedback_fn):
2494 """Remove the instance.
2497 instance = self.instance
2498 logger.Info("shutting down instance %s on node %s" %
2499 (instance.name, instance.primary_node))
2501 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2502 if self.op.ignore_failures:
2503 feedback_fn("Warning: can't shutdown instance")
2505 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2506 (instance.name, instance.primary_node))
2508 logger.Info("removing block devices for instance %s" % instance.name)
2510 if not _RemoveDisks(instance, self.cfg):
2511 if self.op.ignore_failures:
2512 feedback_fn("Warning: can't remove instance's disks")
2514 raise errors.OpExecError("Can't remove instance's disks")
2516 logger.Info("removing instance %s out of cluster config" % instance.name)
2518 self.cfg.RemoveInstance(instance.name)
2521 class LUQueryInstances(NoHooksLU):
2522 """Logical unit for querying instances.
2525 _OP_REQP = ["output_fields", "names"]
2527 def CheckPrereq(self):
2528 """Check prerequisites.
2530 This checks that the fields required are valid output fields.
2533 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2534 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2535 "admin_state", "admin_ram",
2536 "disk_template", "ip", "mac", "bridge",
2537 "sda_size", "sdb_size", "vcpus"],
2538 dynamic=self.dynamic_fields,
2539 selected=self.op.output_fields)
2541 self.wanted = _GetWantedInstances(self, self.op.names)
2543 def Exec(self, feedback_fn):
2544 """Computes the list of nodes and their attributes.
2547 instance_names = self.wanted
2548 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2551 # begin data gathering
2553 nodes = frozenset([inst.primary_node for inst in instance_list])
2556 if self.dynamic_fields.intersection(self.op.output_fields):
2558 node_data = rpc.call_all_instances_info(nodes)
2560 result = node_data[name]
2562 live_data.update(result)
2563 elif result == False:
2564 bad_nodes.append(name)
2565 # else no instance is alive
2567 live_data = dict([(name, {}) for name in instance_names])
2569 # end data gathering
2572 for instance in instance_list:
2574 for field in self.op.output_fields:
2579 elif field == "pnode":
2580 val = instance.primary_node
2581 elif field == "snodes":
2582 val = list(instance.secondary_nodes)
2583 elif field == "admin_state":
2584 val = (instance.status != "down")
2585 elif field == "oper_state":
2586 if instance.primary_node in bad_nodes:
2589 val = bool(live_data.get(instance.name))
2590 elif field == "status":
2591 if instance.primary_node in bad_nodes:
2592 val = "ERROR_nodedown"
2594 running = bool(live_data.get(instance.name))
2596 if instance.status != "down":
2601 if instance.status != "down":
2605 elif field == "admin_ram":
2606 val = instance.memory
2607 elif field == "oper_ram":
2608 if instance.primary_node in bad_nodes:
2610 elif instance.name in live_data:
2611 val = live_data[instance.name].get("memory", "?")
2614 elif field == "disk_template":
2615 val = instance.disk_template
2617 val = instance.nics[0].ip
2618 elif field == "bridge":
2619 val = instance.nics[0].bridge
2620 elif field == "mac":
2621 val = instance.nics[0].mac
2622 elif field == "sda_size" or field == "sdb_size":
2623 disk = instance.FindDisk(field[:3])
2628 elif field == "vcpus":
2629 val = instance.vcpus
2631 raise errors.ParameterError(field)
2638 class LUFailoverInstance(LogicalUnit):
2639 """Failover an instance.
2642 HPATH = "instance-failover"
2643 HTYPE = constants.HTYPE_INSTANCE
2644 _OP_REQP = ["instance_name", "ignore_consistency"]
2646 def BuildHooksEnv(self):
2649 This runs on master, primary and secondary nodes of the instance.
2653 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2655 env.update(_BuildInstanceHookEnvByObject(self.instance))
2656 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2659 def CheckPrereq(self):
2660 """Check prerequisites.
2662 This checks that the instance is in the cluster.
2665 instance = self.cfg.GetInstanceInfo(
2666 self.cfg.ExpandInstanceName(self.op.instance_name))
2667 if instance is None:
2668 raise errors.OpPrereqError("Instance '%s' not known" %
2669 self.op.instance_name)
2671 if instance.disk_template not in constants.DTS_NET_MIRROR:
2672 raise errors.OpPrereqError("Instance's disk layout is not"
2673 " network mirrored, cannot failover.")
2675 secondary_nodes = instance.secondary_nodes
2676 if not secondary_nodes:
2677 raise errors.ProgrammerError("no secondary node but using "
2678 "DT_REMOTE_RAID1 template")
2680 target_node = secondary_nodes[0]
2681 # check memory requirements on the secondary node
2682 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2683 instance.name, instance.memory)
2685 # check bridge existance
2686 brlist = [nic.bridge for nic in instance.nics]
2687 if not rpc.call_bridges_exist(target_node, brlist):
2688 raise errors.OpPrereqError("One or more target bridges %s does not"
2689 " exist on destination node '%s'" %
2690 (brlist, target_node))
2692 self.instance = instance
2694 def Exec(self, feedback_fn):
2695 """Failover an instance.
2697 The failover is done by shutting it down on its present node and
2698 starting it on the secondary.
2701 instance = self.instance
2703 source_node = instance.primary_node
2704 target_node = instance.secondary_nodes[0]
2706 feedback_fn("* checking disk consistency between source and target")
2707 for dev in instance.disks:
2708 # for remote_raid1, these are md over drbd
2709 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2710 if instance.status == "up" and not self.op.ignore_consistency:
2711 raise errors.OpExecError("Disk %s is degraded on target node,"
2712 " aborting failover." % dev.iv_name)
2714 feedback_fn("* shutting down instance on source node")
2715 logger.Info("Shutting down instance %s on node %s" %
2716 (instance.name, source_node))
2718 if not rpc.call_instance_shutdown(source_node, instance):
2719 if self.op.ignore_consistency:
2720 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2721 " anyway. Please make sure node %s is down" %
2722 (instance.name, source_node, source_node))
2724 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2725 (instance.name, source_node))
2727 feedback_fn("* deactivating the instance's disks on source node")
2728 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2729 raise errors.OpExecError("Can't shut down the instance's disks.")
2731 instance.primary_node = target_node
2732 # distribute new instance config to the other nodes
2733 self.cfg.AddInstance(instance)
2735 # Only start the instance if it's marked as up
2736 if instance.status == "up":
2737 feedback_fn("* activating the instance's disks on target node")
2738 logger.Info("Starting instance %s on node %s" %
2739 (instance.name, target_node))
2741 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2742 ignore_secondaries=True)
2744 _ShutdownInstanceDisks(instance, self.cfg)
2745 raise errors.OpExecError("Can't activate the instance's disks")
2747 feedback_fn("* starting the instance on the target node")
2748 if not rpc.call_instance_start(target_node, instance, None):
2749 _ShutdownInstanceDisks(instance, self.cfg)
2750 raise errors.OpExecError("Could not start instance %s on node %s." %
2751 (instance.name, target_node))
2754 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2755 """Create a tree of block devices on the primary node.
2757 This always creates all devices.
2761 for child in device.children:
2762 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2765 cfg.SetDiskID(device, node)
2766 new_id = rpc.call_blockdev_create(node, device, device.size,
2767 instance.name, True, info)
2770 if device.physical_id is None:
2771 device.physical_id = new_id
2775 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2776 """Create a tree of block devices on a secondary node.
2778 If this device type has to be created on secondaries, create it and
2781 If not, just recurse to children keeping the same 'force' value.
2784 if device.CreateOnSecondary():
2787 for child in device.children:
2788 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2789 child, force, info):
2794 cfg.SetDiskID(device, node)
2795 new_id = rpc.call_blockdev_create(node, device, device.size,
2796 instance.name, False, info)
2799 if device.physical_id is None:
2800 device.physical_id = new_id
2804 def _GenerateUniqueNames(cfg, exts):
2805 """Generate a suitable LV name.
2807 This will generate a logical volume name for the given instance.
2812 new_id = cfg.GenerateUniqueID()
2813 results.append("%s%s" % (new_id, val))
2817 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2818 """Generate a drbd device complete with its children.
2821 port = cfg.AllocatePort()
2822 vgname = cfg.GetVGName()
2823 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2824 logical_id=(vgname, names[0]))
2825 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2826 logical_id=(vgname, names[1]))
2827 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2828 logical_id = (primary, secondary, port),
2829 children = [dev_data, dev_meta])
2833 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2834 """Generate a drbd8 device complete with its children.
2837 port = cfg.AllocatePort()
2838 vgname = cfg.GetVGName()
2839 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2840 logical_id=(vgname, names[0]))
2841 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2842 logical_id=(vgname, names[1]))
2843 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2844 logical_id = (primary, secondary, port),
2845 children = [dev_data, dev_meta],
2849 def _GenerateDiskTemplate(cfg, template_name,
2850 instance_name, primary_node,
2851 secondary_nodes, disk_sz, swap_sz):
2852 """Generate the entire disk layout for a given template type.
2855 #TODO: compute space requirements
2857 vgname = cfg.GetVGName()
2858 if template_name == constants.DT_DISKLESS:
2860 elif template_name == constants.DT_PLAIN:
2861 if len(secondary_nodes) != 0:
2862 raise errors.ProgrammerError("Wrong template configuration")
2864 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2865 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2866 logical_id=(vgname, names[0]),
2868 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2869 logical_id=(vgname, names[1]),
2871 disks = [sda_dev, sdb_dev]
2872 elif template_name == constants.DT_LOCAL_RAID1:
2873 if len(secondary_nodes) != 0:
2874 raise errors.ProgrammerError("Wrong template configuration")
2877 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2878 ".sdb_m1", ".sdb_m2"])
2879 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2880 logical_id=(vgname, names[0]))
2881 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2882 logical_id=(vgname, names[1]))
2883 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2885 children = [sda_dev_m1, sda_dev_m2])
2886 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2887 logical_id=(vgname, names[2]))
2888 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2889 logical_id=(vgname, names[3]))
2890 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2892 children = [sdb_dev_m1, sdb_dev_m2])
2893 disks = [md_sda_dev, md_sdb_dev]
2894 elif template_name == constants.DT_REMOTE_RAID1:
2895 if len(secondary_nodes) != 1:
2896 raise errors.ProgrammerError("Wrong template configuration")
2897 remote_node = secondary_nodes[0]
2898 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2899 ".sdb_data", ".sdb_meta"])
2900 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2901 disk_sz, names[0:2])
2902 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2903 children = [drbd_sda_dev], size=disk_sz)
2904 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2905 swap_sz, names[2:4])
2906 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2907 children = [drbd_sdb_dev], size=swap_sz)
2908 disks = [md_sda_dev, md_sdb_dev]
2909 elif template_name == constants.DT_DRBD8:
2910 if len(secondary_nodes) != 1:
2911 raise errors.ProgrammerError("Wrong template configuration")
2912 remote_node = secondary_nodes[0]
2913 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2914 ".sdb_data", ".sdb_meta"])
2915 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2916 disk_sz, names[0:2], "sda")
2917 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2918 swap_sz, names[2:4], "sdb")
2919 disks = [drbd_sda_dev, drbd_sdb_dev]
2921 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2925 def _GetInstanceInfoText(instance):
2926 """Compute that text that should be added to the disk's metadata.
2929 return "originstname+%s" % instance.name
2932 def _CreateDisks(cfg, instance):
2933 """Create all disks for an instance.
2935 This abstracts away some work from AddInstance.
2938 instance: the instance object
2941 True or False showing the success of the creation process
2944 info = _GetInstanceInfoText(instance)
2946 for device in instance.disks:
2947 logger.Info("creating volume %s for instance %s" %
2948 (device.iv_name, instance.name))
2950 for secondary_node in instance.secondary_nodes:
2951 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2952 device, False, info):
2953 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2954 (device.iv_name, device, secondary_node))
2957 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2958 instance, device, info):
2959 logger.Error("failed to create volume %s on primary!" %
2965 def _RemoveDisks(instance, cfg):
2966 """Remove all disks for an instance.
2968 This abstracts away some work from `AddInstance()` and
2969 `RemoveInstance()`. Note that in case some of the devices couldn't
2970 be removed, the removal will continue with the other ones (compare
2971 with `_CreateDisks()`).
2974 instance: the instance object
2977 True or False showing the success of the removal proces
2980 logger.Info("removing block devices for instance %s" % instance.name)
2983 for device in instance.disks:
2984 for node, disk in device.ComputeNodeTree(instance.primary_node):
2985 cfg.SetDiskID(disk, node)
2986 if not rpc.call_blockdev_remove(node, disk):
2987 logger.Error("could not remove block device %s on node %s,"
2988 " continuing anyway" %
2989 (device.iv_name, node))
2994 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2995 """Compute disk size requirements in the volume group
2997 This is currently hard-coded for the two-drive layout.
3000 # Required free disk space as a function of disk and swap space
3002 constants.DT_DISKLESS: None,
3003 constants.DT_PLAIN: disk_size + swap_size,
3004 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3005 # 256 MB are added for drbd metadata, 128MB for each drbd device
3006 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3007 constants.DT_DRBD8: disk_size + swap_size + 256,
3010 if disk_template not in req_size_dict:
3011 raise errors.ProgrammerError("Disk template '%s' size requirement"
3012 " is unknown" % disk_template)
3014 return req_size_dict[disk_template]
3017 class LUCreateInstance(LogicalUnit):
3018 """Create an instance.
3021 HPATH = "instance-add"
3022 HTYPE = constants.HTYPE_INSTANCE
3023 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3024 "disk_template", "swap_size", "mode", "start", "vcpus",
3025 "wait_for_sync", "ip_check", "mac"]
3027 def _RunAllocator(self):
3028 """Run the allocator based on input opcode.
3031 disks = [{"size": self.op.disk_size, "mode": "w"},
3032 {"size": self.op.swap_size, "mode": "w"}]
3033 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3034 "bridge": self.op.bridge}]
3035 ial = IAllocator(self.cfg, self.sstore,
3036 mode=constants.IALLOCATOR_MODE_ALLOC,
3037 name=self.op.instance_name,
3038 disk_template=self.op.disk_template,
3041 vcpus=self.op.vcpus,
3042 mem_size=self.op.mem_size,
3047 ial.Run(self.op.iallocator)
3050 raise errors.OpPrereqError("Can't compute nodes using"
3051 " iallocator '%s': %s" % (self.op.iallocator,
3053 if len(ial.nodes) != ial.required_nodes:
3054 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3055 " of nodes (%s), required %s" %
3056 (len(ial.nodes), ial.required_nodes))
3057 self.op.pnode = ial.nodes[0]
3058 logger.ToStdout("Selected nodes for the instance: %s" %
3059 (", ".join(ial.nodes),))
3060 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3061 (self.op.instance_name, self.op.iallocator, ial.nodes))
3062 if ial.required_nodes == 2:
3063 self.op.snode = ial.nodes[1]
3065 def BuildHooksEnv(self):
3068 This runs on master, primary and secondary nodes of the instance.
3072 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3073 "INSTANCE_DISK_SIZE": self.op.disk_size,
3074 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3075 "INSTANCE_ADD_MODE": self.op.mode,
3077 if self.op.mode == constants.INSTANCE_IMPORT:
3078 env["INSTANCE_SRC_NODE"] = self.op.src_node
3079 env["INSTANCE_SRC_PATH"] = self.op.src_path
3080 env["INSTANCE_SRC_IMAGE"] = self.src_image
3082 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3083 primary_node=self.op.pnode,
3084 secondary_nodes=self.secondaries,
3085 status=self.instance_status,
3086 os_type=self.op.os_type,
3087 memory=self.op.mem_size,
3088 vcpus=self.op.vcpus,
3089 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3092 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3097 def CheckPrereq(self):
3098 """Check prerequisites.
3101 # set optional parameters to none if they don't exist
3102 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3104 if not hasattr(self.op, attr):
3105 setattr(self.op, attr, None)
3107 if self.op.mode not in (constants.INSTANCE_CREATE,
3108 constants.INSTANCE_IMPORT):
3109 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3112 if self.op.mode == constants.INSTANCE_IMPORT:
3113 src_node = getattr(self.op, "src_node", None)
3114 src_path = getattr(self.op, "src_path", None)
3115 if src_node is None or src_path is None:
3116 raise errors.OpPrereqError("Importing an instance requires source"
3117 " node and path options")
3118 src_node_full = self.cfg.ExpandNodeName(src_node)
3119 if src_node_full is None:
3120 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3121 self.op.src_node = src_node = src_node_full
3123 if not os.path.isabs(src_path):
3124 raise errors.OpPrereqError("The source path must be absolute")
3126 export_info = rpc.call_export_info(src_node, src_path)
3129 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3131 if not export_info.has_section(constants.INISECT_EXP):
3132 raise errors.ProgrammerError("Corrupted export config")
3134 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3135 if (int(ei_version) != constants.EXPORT_VERSION):
3136 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3137 (ei_version, constants.EXPORT_VERSION))
3139 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3140 raise errors.OpPrereqError("Can't import instance with more than"
3143 # FIXME: are the old os-es, disk sizes, etc. useful?
3144 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3145 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3147 self.src_image = diskimage
3148 else: # INSTANCE_CREATE
3149 if getattr(self.op, "os_type", None) is None:
3150 raise errors.OpPrereqError("No guest OS specified")
3152 #### instance parameters check
3154 # disk template and mirror node verification
3155 if self.op.disk_template not in constants.DISK_TEMPLATES:
3156 raise errors.OpPrereqError("Invalid disk template name")
3158 # instance name verification
3159 hostname1 = utils.HostInfo(self.op.instance_name)
3161 self.op.instance_name = instance_name = hostname1.name
3162 instance_list = self.cfg.GetInstanceList()
3163 if instance_name in instance_list:
3164 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3167 # ip validity checks
3168 ip = getattr(self.op, "ip", None)
3169 if ip is None or ip.lower() == "none":
3171 elif ip.lower() == "auto":
3172 inst_ip = hostname1.ip
3174 if not utils.IsValidIP(ip):
3175 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3176 " like a valid IP" % ip)
3178 self.inst_ip = self.op.ip = inst_ip
3180 if self.op.start and not self.op.ip_check:
3181 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3182 " adding an instance in start mode")
3184 if self.op.ip_check:
3185 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3186 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3187 (hostname1.ip, instance_name))
3189 # MAC address verification
3190 if self.op.mac != "auto":
3191 if not utils.IsValidMac(self.op.mac.lower()):
3192 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3195 # bridge verification
3196 bridge = getattr(self.op, "bridge", None)
3198 self.op.bridge = self.cfg.GetDefBridge()
3200 self.op.bridge = bridge
3202 # boot order verification
3203 if self.op.hvm_boot_order is not None:
3204 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3205 raise errors.OpPrereqError("invalid boot order specified,"
3206 " must be one or more of [acdn]")
3209 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3210 raise errors.OpPrereqError("One and only one of iallocator and primary"
3211 " node must be given")
3213 if self.op.iallocator is not None:
3214 self._RunAllocator()
3216 #### node related checks
3218 # check primary node
3219 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3221 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3223 self.op.pnode = pnode.name
3225 self.secondaries = []
3227 # mirror node verification
3228 if self.op.disk_template in constants.DTS_NET_MIRROR:
3229 if getattr(self.op, "snode", None) is None:
3230 raise errors.OpPrereqError("The networked disk templates need"
3233 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3234 if snode_name is None:
3235 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3237 elif snode_name == pnode.name:
3238 raise errors.OpPrereqError("The secondary node cannot be"
3239 " the primary node.")
3240 self.secondaries.append(snode_name)
3242 req_size = _ComputeDiskSize(self.op.disk_template,
3243 self.op.disk_size, self.op.swap_size)
3245 # Check lv size requirements
3246 if req_size is not None:
3247 nodenames = [pnode.name] + self.secondaries
3248 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3249 for node in nodenames:
3250 info = nodeinfo.get(node, None)
3252 raise errors.OpPrereqError("Cannot get current information"
3253 " from node '%s'" % nodeinfo)
3254 vg_free = info.get('vg_free', None)
3255 if not isinstance(vg_free, int):
3256 raise errors.OpPrereqError("Can't compute free disk space on"
3258 if req_size > info['vg_free']:
3259 raise errors.OpPrereqError("Not enough disk space on target node %s."
3260 " %d MB available, %d MB required" %
3261 (node, info['vg_free'], req_size))
3264 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3266 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3267 " primary node" % self.op.os_type)
3269 if self.op.kernel_path == constants.VALUE_NONE:
3270 raise errors.OpPrereqError("Can't set instance kernel to none")
3273 # bridge check on primary node
3274 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3275 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3276 " destination node '%s'" %
3277 (self.op.bridge, pnode.name))
3280 self.instance_status = 'up'
3282 self.instance_status = 'down'
3284 def Exec(self, feedback_fn):
3285 """Create and add the instance to the cluster.
3288 instance = self.op.instance_name
3289 pnode_name = self.pnode.name
3291 if self.op.mac == "auto":
3292 mac_address = self.cfg.GenerateMAC()
3294 mac_address = self.op.mac
3296 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3297 if self.inst_ip is not None:
3298 nic.ip = self.inst_ip
3300 ht_kind = self.sstore.GetHypervisorType()
3301 if ht_kind in constants.HTS_REQ_PORT:
3302 network_port = self.cfg.AllocatePort()
3306 disks = _GenerateDiskTemplate(self.cfg,
3307 self.op.disk_template,
3308 instance, pnode_name,
3309 self.secondaries, self.op.disk_size,
3312 iobj = objects.Instance(name=instance, os=self.op.os_type,
3313 primary_node=pnode_name,
3314 memory=self.op.mem_size,
3315 vcpus=self.op.vcpus,
3316 nics=[nic], disks=disks,
3317 disk_template=self.op.disk_template,
3318 status=self.instance_status,
3319 network_port=network_port,
3320 kernel_path=self.op.kernel_path,
3321 initrd_path=self.op.initrd_path,
3322 hvm_boot_order=self.op.hvm_boot_order,
3325 feedback_fn("* creating instance disks...")
3326 if not _CreateDisks(self.cfg, iobj):
3327 _RemoveDisks(iobj, self.cfg)
3328 raise errors.OpExecError("Device creation failed, reverting...")
3330 feedback_fn("adding instance %s to cluster config" % instance)
3332 self.cfg.AddInstance(iobj)
3334 if self.op.wait_for_sync:
3335 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3336 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3337 # make sure the disks are not degraded (still sync-ing is ok)
3339 feedback_fn("* checking mirrors status")
3340 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3345 _RemoveDisks(iobj, self.cfg)
3346 self.cfg.RemoveInstance(iobj.name)
3347 raise errors.OpExecError("There are some degraded disks for"
3350 feedback_fn("creating os for instance %s on node %s" %
3351 (instance, pnode_name))
3353 if iobj.disk_template != constants.DT_DISKLESS:
3354 if self.op.mode == constants.INSTANCE_CREATE:
3355 feedback_fn("* running the instance OS create scripts...")
3356 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3357 raise errors.OpExecError("could not add os for instance %s"
3359 (instance, pnode_name))
3361 elif self.op.mode == constants.INSTANCE_IMPORT:
3362 feedback_fn("* running the instance OS import scripts...")
3363 src_node = self.op.src_node
3364 src_image = self.src_image
3365 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3366 src_node, src_image):
3367 raise errors.OpExecError("Could not import os for instance"
3369 (instance, pnode_name))
3371 # also checked in the prereq part
3372 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3376 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3377 feedback_fn("* starting instance...")
3378 if not rpc.call_instance_start(pnode_name, iobj, None):
3379 raise errors.OpExecError("Could not start instance")
3382 class LUConnectConsole(NoHooksLU):
3383 """Connect to an instance's console.
3385 This is somewhat special in that it returns the command line that
3386 you need to run on the master node in order to connect to the
3390 _OP_REQP = ["instance_name"]
3392 def CheckPrereq(self):
3393 """Check prerequisites.
3395 This checks that the instance is in the cluster.
3398 instance = self.cfg.GetInstanceInfo(
3399 self.cfg.ExpandInstanceName(self.op.instance_name))
3400 if instance is None:
3401 raise errors.OpPrereqError("Instance '%s' not known" %
3402 self.op.instance_name)
3403 self.instance = instance
3405 def Exec(self, feedback_fn):
3406 """Connect to the console of an instance
3409 instance = self.instance
3410 node = instance.primary_node
3412 node_insts = rpc.call_instance_list([node])[node]
3413 if node_insts is False:
3414 raise errors.OpExecError("Can't connect to node %s." % node)
3416 if instance.name not in node_insts:
3417 raise errors.OpExecError("Instance %s is not running." % instance.name)
3419 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3421 hyper = hypervisor.GetHypervisor()
3422 console_cmd = hyper.GetShellCommandForConsole(instance)
3424 argv = ["ssh", "-q", "-t"]
3425 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3426 argv.extend(ssh.BATCH_MODE_OPTS)
3428 argv.append(console_cmd)
3432 class LUAddMDDRBDComponent(LogicalUnit):
3433 """Adda new mirror member to an instance's disk.
3436 HPATH = "mirror-add"
3437 HTYPE = constants.HTYPE_INSTANCE
3438 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3440 def BuildHooksEnv(self):
3443 This runs on the master, the primary and all the secondaries.
3447 "NEW_SECONDARY": self.op.remote_node,
3448 "DISK_NAME": self.op.disk_name,
3450 env.update(_BuildInstanceHookEnvByObject(self.instance))
3451 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3452 self.op.remote_node,] + list(self.instance.secondary_nodes)
3455 def CheckPrereq(self):
3456 """Check prerequisites.
3458 This checks that the instance is in the cluster.
3461 instance = self.cfg.GetInstanceInfo(
3462 self.cfg.ExpandInstanceName(self.op.instance_name))
3463 if instance is None:
3464 raise errors.OpPrereqError("Instance '%s' not known" %
3465 self.op.instance_name)
3466 self.instance = instance
3468 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3469 if remote_node is None:
3470 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3471 self.remote_node = remote_node
3473 if remote_node == instance.primary_node:
3474 raise errors.OpPrereqError("The specified node is the primary node of"
3477 if instance.disk_template != constants.DT_REMOTE_RAID1:
3478 raise errors.OpPrereqError("Instance's disk layout is not"
3480 for disk in instance.disks:
3481 if disk.iv_name == self.op.disk_name:
3484 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3485 " instance." % self.op.disk_name)
3486 if len(disk.children) > 1:
3487 raise errors.OpPrereqError("The device already has two slave devices."
3488 " This would create a 3-disk raid1 which we"
3492 def Exec(self, feedback_fn):
3493 """Add the mirror component
3497 instance = self.instance
3499 remote_node = self.remote_node
3500 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3501 names = _GenerateUniqueNames(self.cfg, lv_names)
3502 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3503 remote_node, disk.size, names)
3505 logger.Info("adding new mirror component on secondary")
3507 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3509 _GetInstanceInfoText(instance)):
3510 raise errors.OpExecError("Failed to create new component on secondary"
3511 " node %s" % remote_node)
3513 logger.Info("adding new mirror component on primary")
3515 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3517 _GetInstanceInfoText(instance)):
3518 # remove secondary dev
3519 self.cfg.SetDiskID(new_drbd, remote_node)
3520 rpc.call_blockdev_remove(remote_node, new_drbd)
3521 raise errors.OpExecError("Failed to create volume on primary")
3523 # the device exists now
3524 # call the primary node to add the mirror to md
3525 logger.Info("adding new mirror component to md")
3526 if not rpc.call_blockdev_addchildren(instance.primary_node,
3528 logger.Error("Can't add mirror compoment to md!")
3529 self.cfg.SetDiskID(new_drbd, remote_node)
3530 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3531 logger.Error("Can't rollback on secondary")
3532 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3533 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3534 logger.Error("Can't rollback on primary")
3535 raise errors.OpExecError("Can't add mirror component to md array")
3537 disk.children.append(new_drbd)
3539 self.cfg.AddInstance(instance)
3541 _WaitForSync(self.cfg, instance, self.proc)
3546 class LURemoveMDDRBDComponent(LogicalUnit):
3547 """Remove a component from a remote_raid1 disk.
3550 HPATH = "mirror-remove"
3551 HTYPE = constants.HTYPE_INSTANCE
3552 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3554 def BuildHooksEnv(self):
3557 This runs on the master, the primary and all the secondaries.
3561 "DISK_NAME": self.op.disk_name,
3562 "DISK_ID": self.op.disk_id,
3563 "OLD_SECONDARY": self.old_secondary,
3565 env.update(_BuildInstanceHookEnvByObject(self.instance))
3566 nl = [self.sstore.GetMasterNode(),
3567 self.instance.primary_node] + list(self.instance.secondary_nodes)
3570 def CheckPrereq(self):
3571 """Check prerequisites.
3573 This checks that the instance is in the cluster.
3576 instance = self.cfg.GetInstanceInfo(
3577 self.cfg.ExpandInstanceName(self.op.instance_name))
3578 if instance is None:
3579 raise errors.OpPrereqError("Instance '%s' not known" %
3580 self.op.instance_name)
3581 self.instance = instance
3583 if instance.disk_template != constants.DT_REMOTE_RAID1:
3584 raise errors.OpPrereqError("Instance's disk layout is not"
3586 for disk in instance.disks:
3587 if disk.iv_name == self.op.disk_name:
3590 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3591 " instance." % self.op.disk_name)
3592 for child in disk.children:
3593 if (child.dev_type == constants.LD_DRBD7 and
3594 child.logical_id[2] == self.op.disk_id):
3597 raise errors.OpPrereqError("Can't find the device with this port.")
3599 if len(disk.children) < 2:
3600 raise errors.OpPrereqError("Cannot remove the last component from"
3604 if self.child.logical_id[0] == instance.primary_node:
3608 self.old_secondary = self.child.logical_id[oid]
3610 def Exec(self, feedback_fn):
3611 """Remove the mirror component
3614 instance = self.instance
3617 logger.Info("remove mirror component")
3618 self.cfg.SetDiskID(disk, instance.primary_node)
3619 if not rpc.call_blockdev_removechildren(instance.primary_node,
3621 raise errors.OpExecError("Can't remove child from mirror.")
3623 for node in child.logical_id[:2]:
3624 self.cfg.SetDiskID(child, node)
3625 if not rpc.call_blockdev_remove(node, child):
3626 logger.Error("Warning: failed to remove device from node %s,"
3627 " continuing operation." % node)
3629 disk.children.remove(child)
3630 self.cfg.AddInstance(instance)
3633 class LUReplaceDisks(LogicalUnit):
3634 """Replace the disks of an instance.
3637 HPATH = "mirrors-replace"
3638 HTYPE = constants.HTYPE_INSTANCE
3639 _OP_REQP = ["instance_name", "mode", "disks"]
3641 def BuildHooksEnv(self):
3644 This runs on the master, the primary and all the secondaries.
3648 "MODE": self.op.mode,
3649 "NEW_SECONDARY": self.op.remote_node,
3650 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3652 env.update(_BuildInstanceHookEnvByObject(self.instance))
3654 self.sstore.GetMasterNode(),
3655 self.instance.primary_node,
3657 if self.op.remote_node is not None:
3658 nl.append(self.op.remote_node)
3661 def CheckPrereq(self):
3662 """Check prerequisites.
3664 This checks that the instance is in the cluster.
3667 instance = self.cfg.GetInstanceInfo(
3668 self.cfg.ExpandInstanceName(self.op.instance_name))
3669 if instance is None:
3670 raise errors.OpPrereqError("Instance '%s' not known" %
3671 self.op.instance_name)
3672 self.instance = instance
3673 self.op.instance_name = instance.name
3675 if instance.disk_template not in constants.DTS_NET_MIRROR:
3676 raise errors.OpPrereqError("Instance's disk layout is not"
3677 " network mirrored.")
3679 if len(instance.secondary_nodes) != 1:
3680 raise errors.OpPrereqError("The instance has a strange layout,"
3681 " expected one secondary but found %d" %
3682 len(instance.secondary_nodes))
3684 self.sec_node = instance.secondary_nodes[0]
3686 remote_node = getattr(self.op, "remote_node", None)
3687 if remote_node is not None:
3688 remote_node = self.cfg.ExpandNodeName(remote_node)
3689 if remote_node is None:
3690 raise errors.OpPrereqError("Node '%s' not known" %
3691 self.op.remote_node)
3692 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3694 self.remote_node_info = None
3695 if remote_node == instance.primary_node:
3696 raise errors.OpPrereqError("The specified node is the primary node of"
3698 elif remote_node == self.sec_node:
3699 if self.op.mode == constants.REPLACE_DISK_SEC:
3700 # this is for DRBD8, where we can't execute the same mode of
3701 # replacement as for drbd7 (no different port allocated)
3702 raise errors.OpPrereqError("Same secondary given, cannot execute"
3704 # the user gave the current secondary, switch to
3705 # 'no-replace-secondary' mode for drbd7
3707 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3708 self.op.mode != constants.REPLACE_DISK_ALL):
3709 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3710 " disks replacement, not individual ones")
3711 if instance.disk_template == constants.DT_DRBD8:
3712 if (self.op.mode == constants.REPLACE_DISK_ALL and
3713 remote_node is not None):
3714 # switch to replace secondary mode
3715 self.op.mode = constants.REPLACE_DISK_SEC
3717 if self.op.mode == constants.REPLACE_DISK_ALL:
3718 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3719 " secondary disk replacement, not"
3721 elif self.op.mode == constants.REPLACE_DISK_PRI:
3722 if remote_node is not None:
3723 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3724 " the secondary while doing a primary"
3725 " node disk replacement")
3726 self.tgt_node = instance.primary_node
3727 self.oth_node = instance.secondary_nodes[0]
3728 elif self.op.mode == constants.REPLACE_DISK_SEC:
3729 self.new_node = remote_node # this can be None, in which case
3730 # we don't change the secondary
3731 self.tgt_node = instance.secondary_nodes[0]
3732 self.oth_node = instance.primary_node
3734 raise errors.ProgrammerError("Unhandled disk replace mode")
3736 for name in self.op.disks:
3737 if instance.FindDisk(name) is None:
3738 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3739 (name, instance.name))
3740 self.op.remote_node = remote_node
3742 def _ExecRR1(self, feedback_fn):
3743 """Replace the disks of an instance.
3746 instance = self.instance
3749 if self.op.remote_node is None:
3750 remote_node = self.sec_node
3752 remote_node = self.op.remote_node
3754 for dev in instance.disks:
3756 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3757 names = _GenerateUniqueNames(cfg, lv_names)
3758 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3759 remote_node, size, names)
3760 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3761 logger.Info("adding new mirror component on secondary for %s" %
3764 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3766 _GetInstanceInfoText(instance)):
3767 raise errors.OpExecError("Failed to create new component on secondary"
3768 " node %s. Full abort, cleanup manually!" %
3771 logger.Info("adding new mirror component on primary")
3773 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3775 _GetInstanceInfoText(instance)):
3776 # remove secondary dev
3777 cfg.SetDiskID(new_drbd, remote_node)
3778 rpc.call_blockdev_remove(remote_node, new_drbd)
3779 raise errors.OpExecError("Failed to create volume on primary!"
3780 " Full abort, cleanup manually!!")
3782 # the device exists now
3783 # call the primary node to add the mirror to md
3784 logger.Info("adding new mirror component to md")
3785 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3787 logger.Error("Can't add mirror compoment to md!")
3788 cfg.SetDiskID(new_drbd, remote_node)
3789 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3790 logger.Error("Can't rollback on secondary")
3791 cfg.SetDiskID(new_drbd, instance.primary_node)
3792 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3793 logger.Error("Can't rollback on primary")
3794 raise errors.OpExecError("Full abort, cleanup manually!!")
3796 dev.children.append(new_drbd)
3797 cfg.AddInstance(instance)
3799 # this can fail as the old devices are degraded and _WaitForSync
3800 # does a combined result over all disks, so we don't check its
3802 _WaitForSync(cfg, instance, self.proc, unlock=True)
3804 # so check manually all the devices
3805 for name in iv_names:
3806 dev, child, new_drbd = iv_names[name]
3807 cfg.SetDiskID(dev, instance.primary_node)
3808 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3810 raise errors.OpExecError("MD device %s is degraded!" % name)
3811 cfg.SetDiskID(new_drbd, instance.primary_node)
3812 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3814 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3816 for name in iv_names:
3817 dev, child, new_drbd = iv_names[name]
3818 logger.Info("remove mirror %s component" % name)
3819 cfg.SetDiskID(dev, instance.primary_node)
3820 if not rpc.call_blockdev_removechildren(instance.primary_node,
3822 logger.Error("Can't remove child from mirror, aborting"
3823 " *this device cleanup*.\nYou need to cleanup manually!!")
3826 for node in child.logical_id[:2]:
3827 logger.Info("remove child device on %s" % node)
3828 cfg.SetDiskID(child, node)
3829 if not rpc.call_blockdev_remove(node, child):
3830 logger.Error("Warning: failed to remove device from node %s,"
3831 " continuing operation." % node)
3833 dev.children.remove(child)
3835 cfg.AddInstance(instance)
3837 def _ExecD8DiskOnly(self, feedback_fn):
3838 """Replace a disk on the primary or secondary for dbrd8.
3840 The algorithm for replace is quite complicated:
3841 - for each disk to be replaced:
3842 - create new LVs on the target node with unique names
3843 - detach old LVs from the drbd device
3844 - rename old LVs to name_replaced.<time_t>
3845 - rename new LVs to old LVs
3846 - attach the new LVs (with the old names now) to the drbd device
3847 - wait for sync across all devices
3848 - for each modified disk:
3849 - remove old LVs (which have the name name_replaces.<time_t>)
3851 Failures are not very well handled.
3855 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3856 instance = self.instance
3858 vgname = self.cfg.GetVGName()
3861 tgt_node = self.tgt_node
3862 oth_node = self.oth_node
3864 # Step: check device activation
3865 self.proc.LogStep(1, steps_total, "check device existence")
3866 info("checking volume groups")
3867 my_vg = cfg.GetVGName()
3868 results = rpc.call_vg_list([oth_node, tgt_node])
3870 raise errors.OpExecError("Can't list volume groups on the nodes")
3871 for node in oth_node, tgt_node:
3872 res = results.get(node, False)
3873 if not res or my_vg not in res:
3874 raise errors.OpExecError("Volume group '%s' not found on %s" %
3876 for dev in instance.disks:
3877 if not dev.iv_name in self.op.disks:
3879 for node in tgt_node, oth_node:
3880 info("checking %s on %s" % (dev.iv_name, node))
3881 cfg.SetDiskID(dev, node)
3882 if not rpc.call_blockdev_find(node, dev):
3883 raise errors.OpExecError("Can't find device %s on node %s" %
3884 (dev.iv_name, node))
3886 # Step: check other node consistency
3887 self.proc.LogStep(2, steps_total, "check peer consistency")
3888 for dev in instance.disks:
3889 if not dev.iv_name in self.op.disks:
3891 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3892 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3893 oth_node==instance.primary_node):
3894 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3895 " to replace disks on this node (%s)" %
3896 (oth_node, tgt_node))
3898 # Step: create new storage
3899 self.proc.LogStep(3, steps_total, "allocate new storage")
3900 for dev in instance.disks:
3901 if not dev.iv_name in self.op.disks:
3904 cfg.SetDiskID(dev, tgt_node)
3905 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3906 names = _GenerateUniqueNames(cfg, lv_names)
3907 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3908 logical_id=(vgname, names[0]))
3909 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3910 logical_id=(vgname, names[1]))
3911 new_lvs = [lv_data, lv_meta]
3912 old_lvs = dev.children
3913 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3914 info("creating new local storage on %s for %s" %
3915 (tgt_node, dev.iv_name))
3916 # since we *always* want to create this LV, we use the
3917 # _Create...OnPrimary (which forces the creation), even if we
3918 # are talking about the secondary node
3919 for new_lv in new_lvs:
3920 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3921 _GetInstanceInfoText(instance)):
3922 raise errors.OpExecError("Failed to create new LV named '%s' on"
3924 (new_lv.logical_id[1], tgt_node))
3926 # Step: for each lv, detach+rename*2+attach
3927 self.proc.LogStep(4, steps_total, "change drbd configuration")
3928 for dev, old_lvs, new_lvs in iv_names.itervalues():
3929 info("detaching %s drbd from local storage" % dev.iv_name)
3930 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3931 raise errors.OpExecError("Can't detach drbd from local storage on node"
3932 " %s for device %s" % (tgt_node, dev.iv_name))
3934 #cfg.Update(instance)
3936 # ok, we created the new LVs, so now we know we have the needed
3937 # storage; as such, we proceed on the target node to rename
3938 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3939 # using the assumption that logical_id == physical_id (which in
3940 # turn is the unique_id on that node)
3942 # FIXME(iustin): use a better name for the replaced LVs
3943 temp_suffix = int(time.time())
3944 ren_fn = lambda d, suff: (d.physical_id[0],
3945 d.physical_id[1] + "_replaced-%s" % suff)
3946 # build the rename list based on what LVs exist on the node
3948 for to_ren in old_lvs:
3949 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3950 if find_res is not None: # device exists
3951 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3953 info("renaming the old LVs on the target node")
3954 if not rpc.call_blockdev_rename(tgt_node, rlist):
3955 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3956 # now we rename the new LVs to the old LVs
3957 info("renaming the new LVs on the target node")
3958 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3959 if not rpc.call_blockdev_rename(tgt_node, rlist):
3960 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3962 for old, new in zip(old_lvs, new_lvs):
3963 new.logical_id = old.logical_id
3964 cfg.SetDiskID(new, tgt_node)
3966 for disk in old_lvs:
3967 disk.logical_id = ren_fn(disk, temp_suffix)
3968 cfg.SetDiskID(disk, tgt_node)
3970 # now that the new lvs have the old name, we can add them to the device
3971 info("adding new mirror component on %s" % tgt_node)
3972 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3973 for new_lv in new_lvs:
3974 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3975 warning("Can't rollback device %s", hint="manually cleanup unused"
3977 raise errors.OpExecError("Can't add local storage to drbd")
3979 dev.children = new_lvs
3980 cfg.Update(instance)
3982 # Step: wait for sync
3984 # this can fail as the old devices are degraded and _WaitForSync
3985 # does a combined result over all disks, so we don't check its
3987 self.proc.LogStep(5, steps_total, "sync devices")
3988 _WaitForSync(cfg, instance, self.proc, unlock=True)
3990 # so check manually all the devices
3991 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3992 cfg.SetDiskID(dev, instance.primary_node)
3993 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3995 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3997 # Step: remove old storage
3998 self.proc.LogStep(6, steps_total, "removing old storage")
3999 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4000 info("remove logical volumes for %s" % name)
4002 cfg.SetDiskID(lv, tgt_node)
4003 if not rpc.call_blockdev_remove(tgt_node, lv):
4004 warning("Can't remove old LV", hint="manually remove unused LVs")
4007 def _ExecD8Secondary(self, feedback_fn):
4008 """Replace the secondary node for drbd8.
4010 The algorithm for replace is quite complicated:
4011 - for all disks of the instance:
4012 - create new LVs on the new node with same names
4013 - shutdown the drbd device on the old secondary
4014 - disconnect the drbd network on the primary
4015 - create the drbd device on the new secondary
4016 - network attach the drbd on the primary, using an artifice:
4017 the drbd code for Attach() will connect to the network if it
4018 finds a device which is connected to the good local disks but
4020 - wait for sync across all devices
4021 - remove all disks from the old secondary
4023 Failures are not very well handled.
4027 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4028 instance = self.instance
4030 vgname = self.cfg.GetVGName()
4033 old_node = self.tgt_node
4034 new_node = self.new_node
4035 pri_node = instance.primary_node
4037 # Step: check device activation
4038 self.proc.LogStep(1, steps_total, "check device existence")
4039 info("checking volume groups")
4040 my_vg = cfg.GetVGName()
4041 results = rpc.call_vg_list([pri_node, new_node])
4043 raise errors.OpExecError("Can't list volume groups on the nodes")
4044 for node in pri_node, new_node:
4045 res = results.get(node, False)
4046 if not res or my_vg not in res:
4047 raise errors.OpExecError("Volume group '%s' not found on %s" %
4049 for dev in instance.disks:
4050 if not dev.iv_name in self.op.disks:
4052 info("checking %s on %s" % (dev.iv_name, pri_node))
4053 cfg.SetDiskID(dev, pri_node)
4054 if not rpc.call_blockdev_find(pri_node, dev):
4055 raise errors.OpExecError("Can't find device %s on node %s" %
4056 (dev.iv_name, pri_node))
4058 # Step: check other node consistency
4059 self.proc.LogStep(2, steps_total, "check peer consistency")
4060 for dev in instance.disks:
4061 if not dev.iv_name in self.op.disks:
4063 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4064 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4065 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4066 " unsafe to replace the secondary" %
4069 # Step: create new storage
4070 self.proc.LogStep(3, steps_total, "allocate new storage")
4071 for dev in instance.disks:
4073 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4074 # since we *always* want to create this LV, we use the
4075 # _Create...OnPrimary (which forces the creation), even if we
4076 # are talking about the secondary node
4077 for new_lv in dev.children:
4078 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4079 _GetInstanceInfoText(instance)):
4080 raise errors.OpExecError("Failed to create new LV named '%s' on"
4082 (new_lv.logical_id[1], new_node))
4084 iv_names[dev.iv_name] = (dev, dev.children)
4086 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4087 for dev in instance.disks:
4089 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4090 # create new devices on new_node
4091 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4092 logical_id=(pri_node, new_node,
4094 children=dev.children)
4095 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4097 _GetInstanceInfoText(instance)):
4098 raise errors.OpExecError("Failed to create new DRBD on"
4099 " node '%s'" % new_node)
4101 for dev in instance.disks:
4102 # we have new devices, shutdown the drbd on the old secondary
4103 info("shutting down drbd for %s on old node" % dev.iv_name)
4104 cfg.SetDiskID(dev, old_node)
4105 if not rpc.call_blockdev_shutdown(old_node, dev):
4106 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4107 hint="Please cleanup this device manually as soon as possible")
4109 info("detaching primary drbds from the network (=> standalone)")
4111 for dev in instance.disks:
4112 cfg.SetDiskID(dev, pri_node)
4113 # set the physical (unique in bdev terms) id to None, meaning
4114 # detach from network
4115 dev.physical_id = (None,) * len(dev.physical_id)
4116 # and 'find' the device, which will 'fix' it to match the
4118 if rpc.call_blockdev_find(pri_node, dev):
4121 warning("Failed to detach drbd %s from network, unusual case" %
4125 # no detaches succeeded (very unlikely)
4126 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4128 # if we managed to detach at least one, we update all the disks of
4129 # the instance to point to the new secondary
4130 info("updating instance configuration")
4131 for dev in instance.disks:
4132 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4133 cfg.SetDiskID(dev, pri_node)
4134 cfg.Update(instance)
4136 # and now perform the drbd attach
4137 info("attaching primary drbds to new secondary (standalone => connected)")
4139 for dev in instance.disks:
4140 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4141 # since the attach is smart, it's enough to 'find' the device,
4142 # it will automatically activate the network, if the physical_id
4144 cfg.SetDiskID(dev, pri_node)
4145 if not rpc.call_blockdev_find(pri_node, dev):
4146 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4147 "please do a gnt-instance info to see the status of disks")
4149 # this can fail as the old devices are degraded and _WaitForSync
4150 # does a combined result over all disks, so we don't check its
4152 self.proc.LogStep(5, steps_total, "sync devices")
4153 _WaitForSync(cfg, instance, self.proc, unlock=True)
4155 # so check manually all the devices
4156 for name, (dev, old_lvs) in iv_names.iteritems():
4157 cfg.SetDiskID(dev, pri_node)
4158 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4160 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4162 self.proc.LogStep(6, steps_total, "removing old storage")
4163 for name, (dev, old_lvs) in iv_names.iteritems():
4164 info("remove logical volumes for %s" % name)
4166 cfg.SetDiskID(lv, old_node)
4167 if not rpc.call_blockdev_remove(old_node, lv):
4168 warning("Can't remove LV on old secondary",
4169 hint="Cleanup stale volumes by hand")
4171 def Exec(self, feedback_fn):
4172 """Execute disk replacement.
4174 This dispatches the disk replacement to the appropriate handler.
4177 instance = self.instance
4178 if instance.disk_template == constants.DT_REMOTE_RAID1:
4180 elif instance.disk_template == constants.DT_DRBD8:
4181 if self.op.remote_node is None:
4182 fn = self._ExecD8DiskOnly
4184 fn = self._ExecD8Secondary
4186 raise errors.ProgrammerError("Unhandled disk replacement case")
4187 return fn(feedback_fn)
4190 class LUQueryInstanceData(NoHooksLU):
4191 """Query runtime instance data.
4194 _OP_REQP = ["instances"]
4196 def CheckPrereq(self):
4197 """Check prerequisites.
4199 This only checks the optional instance list against the existing names.
4202 if not isinstance(self.op.instances, list):
4203 raise errors.OpPrereqError("Invalid argument type 'instances'")
4204 if self.op.instances:
4205 self.wanted_instances = []
4206 names = self.op.instances
4208 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4209 if instance is None:
4210 raise errors.OpPrereqError("No such instance name '%s'" % name)
4211 self.wanted_instances.append(instance)
4213 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4214 in self.cfg.GetInstanceList()]
4218 def _ComputeDiskStatus(self, instance, snode, dev):
4219 """Compute block device status.
4222 self.cfg.SetDiskID(dev, instance.primary_node)
4223 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4224 if dev.dev_type in constants.LDS_DRBD:
4225 # we change the snode then (otherwise we use the one passed in)
4226 if dev.logical_id[0] == instance.primary_node:
4227 snode = dev.logical_id[1]
4229 snode = dev.logical_id[0]
4232 self.cfg.SetDiskID(dev, snode)
4233 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4238 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4239 for child in dev.children]
4244 "iv_name": dev.iv_name,
4245 "dev_type": dev.dev_type,
4246 "logical_id": dev.logical_id,
4247 "physical_id": dev.physical_id,
4248 "pstatus": dev_pstatus,
4249 "sstatus": dev_sstatus,
4250 "children": dev_children,
4255 def Exec(self, feedback_fn):
4256 """Gather and return data"""
4258 for instance in self.wanted_instances:
4259 remote_info = rpc.call_instance_info(instance.primary_node,
4261 if remote_info and "state" in remote_info:
4264 remote_state = "down"
4265 if instance.status == "down":
4266 config_state = "down"
4270 disks = [self._ComputeDiskStatus(instance, None, device)
4271 for device in instance.disks]
4274 "name": instance.name,
4275 "config_state": config_state,
4276 "run_state": remote_state,
4277 "pnode": instance.primary_node,
4278 "snodes": instance.secondary_nodes,
4280 "memory": instance.memory,
4281 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4283 "network_port": instance.network_port,
4284 "vcpus": instance.vcpus,
4285 "kernel_path": instance.kernel_path,
4286 "initrd_path": instance.initrd_path,
4287 "hvm_boot_order": instance.hvm_boot_order,
4290 result[instance.name] = idict
4295 class LUSetInstanceParms(LogicalUnit):
4296 """Modifies an instances's parameters.
4299 HPATH = "instance-modify"
4300 HTYPE = constants.HTYPE_INSTANCE
4301 _OP_REQP = ["instance_name"]
4303 def BuildHooksEnv(self):
4306 This runs on the master, primary and secondaries.
4311 args['memory'] = self.mem
4313 args['vcpus'] = self.vcpus
4314 if self.do_ip or self.do_bridge or self.mac:
4318 ip = self.instance.nics[0].ip
4320 bridge = self.bridge
4322 bridge = self.instance.nics[0].bridge
4326 mac = self.instance.nics[0].mac
4327 args['nics'] = [(ip, bridge, mac)]
4328 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4329 nl = [self.sstore.GetMasterNode(),
4330 self.instance.primary_node] + list(self.instance.secondary_nodes)
4333 def CheckPrereq(self):
4334 """Check prerequisites.
4336 This only checks the instance list against the existing names.
4339 self.mem = getattr(self.op, "mem", None)
4340 self.vcpus = getattr(self.op, "vcpus", None)
4341 self.ip = getattr(self.op, "ip", None)
4342 self.mac = getattr(self.op, "mac", None)
4343 self.bridge = getattr(self.op, "bridge", None)
4344 self.kernel_path = getattr(self.op, "kernel_path", None)
4345 self.initrd_path = getattr(self.op, "initrd_path", None)
4346 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4347 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4348 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4349 if all_parms.count(None) == len(all_parms):
4350 raise errors.OpPrereqError("No changes submitted")
4351 if self.mem is not None:
4353 self.mem = int(self.mem)
4354 except ValueError, err:
4355 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4356 if self.vcpus is not None:
4358 self.vcpus = int(self.vcpus)
4359 except ValueError, err:
4360 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4361 if self.ip is not None:
4363 if self.ip.lower() == "none":
4366 if not utils.IsValidIP(self.ip):
4367 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4370 self.do_bridge = (self.bridge is not None)
4371 if self.mac is not None:
4372 if self.cfg.IsMacInUse(self.mac):
4373 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4375 if not utils.IsValidMac(self.mac):
4376 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4378 if self.kernel_path is not None:
4379 self.do_kernel_path = True
4380 if self.kernel_path == constants.VALUE_NONE:
4381 raise errors.OpPrereqError("Can't set instance to no kernel")
4383 if self.kernel_path != constants.VALUE_DEFAULT:
4384 if not os.path.isabs(self.kernel_path):
4385 raise errors.OpPrereqError("The kernel path must be an absolute"
4388 self.do_kernel_path = False
4390 if self.initrd_path is not None:
4391 self.do_initrd_path = True
4392 if self.initrd_path not in (constants.VALUE_NONE,
4393 constants.VALUE_DEFAULT):
4394 if not os.path.isabs(self.initrd_path):
4395 raise errors.OpPrereqError("The initrd path must be an absolute"
4398 self.do_initrd_path = False
4400 # boot order verification
4401 if self.hvm_boot_order is not None:
4402 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4403 if len(self.hvm_boot_order.strip("acdn")) != 0:
4404 raise errors.OpPrereqError("invalid boot order specified,"
4405 " must be one or more of [acdn]"
4408 instance = self.cfg.GetInstanceInfo(
4409 self.cfg.ExpandInstanceName(self.op.instance_name))
4410 if instance is None:
4411 raise errors.OpPrereqError("No such instance name '%s'" %
4412 self.op.instance_name)
4413 self.op.instance_name = instance.name
4414 self.instance = instance
4417 def Exec(self, feedback_fn):
4418 """Modifies an instance.
4420 All parameters take effect only at the next restart of the instance.
4423 instance = self.instance
4425 instance.memory = self.mem
4426 result.append(("mem", self.mem))
4428 instance.vcpus = self.vcpus
4429 result.append(("vcpus", self.vcpus))
4431 instance.nics[0].ip = self.ip
4432 result.append(("ip", self.ip))
4434 instance.nics[0].bridge = self.bridge
4435 result.append(("bridge", self.bridge))
4437 instance.nics[0].mac = self.mac
4438 result.append(("mac", self.mac))
4439 if self.do_kernel_path:
4440 instance.kernel_path = self.kernel_path
4441 result.append(("kernel_path", self.kernel_path))
4442 if self.do_initrd_path:
4443 instance.initrd_path = self.initrd_path
4444 result.append(("initrd_path", self.initrd_path))
4445 if self.hvm_boot_order:
4446 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4447 instance.hvm_boot_order = None
4449 instance.hvm_boot_order = self.hvm_boot_order
4450 result.append(("hvm_boot_order", self.hvm_boot_order))
4452 self.cfg.AddInstance(instance)
4457 class LUQueryExports(NoHooksLU):
4458 """Query the exports list
4463 def CheckPrereq(self):
4464 """Check that the nodelist contains only existing nodes.
4467 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4469 def Exec(self, feedback_fn):
4470 """Compute the list of all the exported system images.
4473 a dictionary with the structure node->(export-list)
4474 where export-list is a list of the instances exported on
4478 return rpc.call_export_list(self.nodes)
4481 class LUExportInstance(LogicalUnit):
4482 """Export an instance to an image in the cluster.
4485 HPATH = "instance-export"
4486 HTYPE = constants.HTYPE_INSTANCE
4487 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4489 def BuildHooksEnv(self):
4492 This will run on the master, primary node and target node.
4496 "EXPORT_NODE": self.op.target_node,
4497 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4499 env.update(_BuildInstanceHookEnvByObject(self.instance))
4500 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4501 self.op.target_node]
4504 def CheckPrereq(self):
4505 """Check prerequisites.
4507 This checks that the instance and node names are valid.
4510 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4511 self.instance = self.cfg.GetInstanceInfo(instance_name)
4512 if self.instance is None:
4513 raise errors.OpPrereqError("Instance '%s' not found" %
4514 self.op.instance_name)
4517 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4518 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4520 if self.dst_node is None:
4521 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4522 self.op.target_node)
4523 self.op.target_node = self.dst_node.name
4525 def Exec(self, feedback_fn):
4526 """Export an instance to an image in the cluster.
4529 instance = self.instance
4530 dst_node = self.dst_node
4531 src_node = instance.primary_node
4532 if self.op.shutdown:
4533 # shutdown the instance, but not the disks
4534 if not rpc.call_instance_shutdown(src_node, instance):
4535 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4536 (instance.name, source_node))
4538 vgname = self.cfg.GetVGName()
4543 for disk in instance.disks:
4544 if disk.iv_name == "sda":
4545 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4546 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4548 if not new_dev_name:
4549 logger.Error("could not snapshot block device %s on node %s" %
4550 (disk.logical_id[1], src_node))
4552 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4553 logical_id=(vgname, new_dev_name),
4554 physical_id=(vgname, new_dev_name),
4555 iv_name=disk.iv_name)
4556 snap_disks.append(new_dev)
4559 if self.op.shutdown and instance.status == "up":
4560 if not rpc.call_instance_start(src_node, instance, None):
4561 _ShutdownInstanceDisks(instance, self.cfg)
4562 raise errors.OpExecError("Could not start instance")
4564 # TODO: check for size
4566 for dev in snap_disks:
4567 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4569 logger.Error("could not export block device %s from node"
4571 (dev.logical_id[1], src_node, dst_node.name))
4572 if not rpc.call_blockdev_remove(src_node, dev):
4573 logger.Error("could not remove snapshot block device %s from"
4574 " node %s" % (dev.logical_id[1], src_node))
4576 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4577 logger.Error("could not finalize export for instance %s on node %s" %
4578 (instance.name, dst_node.name))
4580 nodelist = self.cfg.GetNodeList()
4581 nodelist.remove(dst_node.name)
4583 # on one-node clusters nodelist will be empty after the removal
4584 # if we proceed the backup would be removed because OpQueryExports
4585 # substitutes an empty list with the full cluster node list.
4587 op = opcodes.OpQueryExports(nodes=nodelist)
4588 exportlist = self.proc.ChainOpCode(op)
4589 for node in exportlist:
4590 if instance.name in exportlist[node]:
4591 if not rpc.call_export_remove(node, instance.name):
4592 logger.Error("could not remove older export for instance %s"
4593 " on node %s" % (instance.name, node))
4596 class LURemoveExport(NoHooksLU):
4597 """Remove exports related to the named instance.
4600 _OP_REQP = ["instance_name"]
4602 def CheckPrereq(self):
4603 """Check prerequisites.
4607 def Exec(self, feedback_fn):
4608 """Remove any export.
4611 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4612 # If the instance was not found we'll try with the name that was passed in.
4613 # This will only work if it was an FQDN, though.
4615 if not instance_name:
4617 instance_name = self.op.instance_name
4619 op = opcodes.OpQueryExports(nodes=[])
4620 exportlist = self.proc.ChainOpCode(op)
4622 for node in exportlist:
4623 if instance_name in exportlist[node]:
4625 if not rpc.call_export_remove(node, instance_name):
4626 logger.Error("could not remove export for instance %s"
4627 " on node %s" % (instance_name, node))
4629 if fqdn_warn and not found:
4630 feedback_fn("Export not found. If trying to remove an export belonging"
4631 " to a deleted instance please use its Fully Qualified"
4635 class TagsLU(NoHooksLU):
4638 This is an abstract class which is the parent of all the other tags LUs.
4641 def CheckPrereq(self):
4642 """Check prerequisites.
4645 if self.op.kind == constants.TAG_CLUSTER:
4646 self.target = self.cfg.GetClusterInfo()
4647 elif self.op.kind == constants.TAG_NODE:
4648 name = self.cfg.ExpandNodeName(self.op.name)
4650 raise errors.OpPrereqError("Invalid node name (%s)" %
4653 self.target = self.cfg.GetNodeInfo(name)
4654 elif self.op.kind == constants.TAG_INSTANCE:
4655 name = self.cfg.ExpandInstanceName(self.op.name)
4657 raise errors.OpPrereqError("Invalid instance name (%s)" %
4660 self.target = self.cfg.GetInstanceInfo(name)
4662 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4666 class LUGetTags(TagsLU):
4667 """Returns the tags of a given object.
4670 _OP_REQP = ["kind", "name"]
4672 def Exec(self, feedback_fn):
4673 """Returns the tag list.
4676 return self.target.GetTags()
4679 class LUSearchTags(NoHooksLU):
4680 """Searches the tags for a given pattern.
4683 _OP_REQP = ["pattern"]
4685 def CheckPrereq(self):
4686 """Check prerequisites.
4688 This checks the pattern passed for validity by compiling it.
4692 self.re = re.compile(self.op.pattern)
4693 except re.error, err:
4694 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4695 (self.op.pattern, err))
4697 def Exec(self, feedback_fn):
4698 """Returns the tag list.
4702 tgts = [("/cluster", cfg.GetClusterInfo())]
4703 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4704 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4705 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4706 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4708 for path, target in tgts:
4709 for tag in target.GetTags():
4710 if self.re.search(tag):
4711 results.append((path, tag))
4715 class LUAddTags(TagsLU):
4716 """Sets a tag on a given object.
4719 _OP_REQP = ["kind", "name", "tags"]
4721 def CheckPrereq(self):
4722 """Check prerequisites.
4724 This checks the type and length of the tag name and value.
4727 TagsLU.CheckPrereq(self)
4728 for tag in self.op.tags:
4729 objects.TaggableObject.ValidateTag(tag)
4731 def Exec(self, feedback_fn):
4736 for tag in self.op.tags:
4737 self.target.AddTag(tag)
4738 except errors.TagError, err:
4739 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4741 self.cfg.Update(self.target)
4742 except errors.ConfigurationError:
4743 raise errors.OpRetryError("There has been a modification to the"
4744 " config file and the operation has been"
4745 " aborted. Please retry.")
4748 class LUDelTags(TagsLU):
4749 """Delete a list of tags from a given object.
4752 _OP_REQP = ["kind", "name", "tags"]
4754 def CheckPrereq(self):
4755 """Check prerequisites.
4757 This checks that we have the given tag.
4760 TagsLU.CheckPrereq(self)
4761 for tag in self.op.tags:
4762 objects.TaggableObject.ValidateTag(tag)
4763 del_tags = frozenset(self.op.tags)
4764 cur_tags = self.target.GetTags()
4765 if not del_tags <= cur_tags:
4766 diff_tags = del_tags - cur_tags
4767 diff_names = ["'%s'" % tag for tag in diff_tags]
4769 raise errors.OpPrereqError("Tag(s) %s not found" %
4770 (",".join(diff_names)))
4772 def Exec(self, feedback_fn):
4773 """Remove the tag from the object.
4776 for tag in self.op.tags:
4777 self.target.RemoveTag(tag)
4779 self.cfg.Update(self.target)
4780 except errors.ConfigurationError:
4781 raise errors.OpRetryError("There has been a modification to the"
4782 " config file and the operation has been"
4783 " aborted. Please retry.")
4785 class LUTestDelay(NoHooksLU):
4786 """Sleep for a specified amount of time.
4788 This LU sleeps on the master and/or nodes for a specified amoutn of
4792 _OP_REQP = ["duration", "on_master", "on_nodes"]
4794 def CheckPrereq(self):
4795 """Check prerequisites.
4797 This checks that we have a good list of nodes and/or the duration
4802 if self.op.on_nodes:
4803 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4805 def Exec(self, feedback_fn):
4806 """Do the actual sleep.
4809 if self.op.on_master:
4810 if not utils.TestDelay(self.op.duration):
4811 raise errors.OpExecError("Error during master delay test")
4812 if self.op.on_nodes:
4813 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4815 raise errors.OpExecError("Complete failure from rpc call")
4816 for node, node_result in result.items():
4818 raise errors.OpExecError("Failure during rpc call to node %s,"
4819 " result: %s" % (node, node_result))
4822 class IAllocator(object):
4823 """IAllocator framework.
4825 An IAllocator instance has three sets of attributes:
4826 - cfg/sstore that are needed to query the cluster
4827 - input data (all members of the _KEYS class attribute are required)
4828 - four buffer attributes (in|out_data|text), that represent the
4829 input (to the external script) in text and data structure format,
4830 and the output from it, again in two formats
4831 - the result variables from the script (success, info, nodes) for
4836 "mem_size", "disks", "disk_template",
4837 "os", "tags", "nics", "vcpus",
4843 def __init__(self, cfg, sstore, mode, name, **kwargs):
4845 self.sstore = sstore
4846 # init buffer variables
4847 self.in_text = self.out_text = self.in_data = self.out_data = None
4848 # init all input fields so that pylint is happy
4851 self.mem_size = self.disks = self.disk_template = None
4852 self.os = self.tags = self.nics = self.vcpus = None
4853 self.relocate_from = None
4855 self.required_nodes = None
4856 # init result fields
4857 self.success = self.info = self.nodes = None
4858 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4859 keyset = self._ALLO_KEYS
4860 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4861 keyset = self._RELO_KEYS
4863 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4864 " IAllocator" % self.mode)
4866 if key not in keyset:
4867 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4868 " IAllocator" % key)
4869 setattr(self, key, kwargs[key])
4871 if key not in kwargs:
4872 raise errors.ProgrammerError("Missing input parameter '%s' to"
4873 " IAllocator" % key)
4874 self._BuildInputData()
4876 def _ComputeClusterData(self):
4877 """Compute the generic allocator input data.
4879 This is the data that is independent of the actual operation.
4886 "cluster_name": self.sstore.GetClusterName(),
4887 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4888 # we don't have job IDs
4893 node_list = cfg.GetNodeList()
4894 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4895 for nname in node_list:
4896 ninfo = cfg.GetNodeInfo(nname)
4897 if nname not in node_data or not isinstance(node_data[nname], dict):
4898 raise errors.OpExecError("Can't get data for node %s" % nname)
4899 remote_info = node_data[nname]
4900 for attr in ['memory_total', 'memory_free',
4901 'vg_size', 'vg_free']:
4902 if attr not in remote_info:
4903 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4906 int(remote_info[attr])
4907 except ValueError, err:
4908 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4909 " %s" % (nname, attr, str(err)))
4911 "tags": list(ninfo.GetTags()),
4912 "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4913 "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4914 "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4915 "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4916 "primary_ip": ninfo.primary_ip,
4917 "secondary_ip": ninfo.secondary_ip,
4919 node_results[nname] = pnr
4920 data["nodes"] = node_results
4924 i_list = cfg.GetInstanceList()
4925 for iname in i_list:
4926 iinfo = cfg.GetInstanceInfo(iname)
4927 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4928 for n in iinfo.nics]
4930 "tags": list(iinfo.GetTags()),
4931 "should_run": iinfo.status == "up",
4932 "vcpus": iinfo.vcpus,
4933 "memory": iinfo.memory,
4935 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4937 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4938 "disk_template": iinfo.disk_template,
4940 instance_data[iname] = pir
4942 data["instances"] = instance_data
4946 def _AddNewInstance(self):
4947 """Add new instance data to allocator structure.
4949 This in combination with _AllocatorGetClusterData will create the
4950 correct structure needed as input for the allocator.
4952 The checks for the completeness of the opcode must have already been
4957 if len(self.disks) != 2:
4958 raise errors.OpExecError("Only two-disk configurations supported")
4960 disk_space = _ComputeDiskSize(self.disk_template,
4961 self.disks[0]["size"], self.disks[1]["size"])
4963 if self.disk_template in constants.DTS_NET_MIRROR:
4964 self.required_nodes = 2
4966 self.required_nodes = 1
4970 "disk_template": self.disk_template,
4973 "vcpus": self.vcpus,
4974 "memory": self.mem_size,
4975 "disks": self.disks,
4976 "disk_space_total": disk_space,
4978 "required_nodes": self.required_nodes,
4980 data["request"] = request
4982 def _AddRelocateInstance(self):
4983 """Add relocate instance data to allocator structure.
4985 This in combination with _IAllocatorGetClusterData will create the
4986 correct structure needed as input for the allocator.
4988 The checks for the completeness of the opcode must have already been
4992 instance = self.cfg.GetInstanceInfo(self.name)
4993 if instance is None:
4994 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4995 " IAllocator" % self.name)
4997 if instance.disk_template not in constants.DTS_NET_MIRROR:
4998 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5000 if len(instance.secondary_nodes) != 1:
5001 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5003 self.required_nodes = 1
5005 disk_space = _ComputeDiskSize(instance.disk_template,
5006 instance.disks[0].size,
5007 instance.disks[1].size)
5012 "disk_space_total": disk_space,
5013 "required_nodes": self.required_nodes,
5014 "relocate_from": self.relocate_from,
5016 self.in_data["request"] = request
5018 def _BuildInputData(self):
5019 """Build input data structures.
5022 self._ComputeClusterData()
5024 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5025 self._AddNewInstance()
5027 self._AddRelocateInstance()
5029 self.in_text = serializer.Dump(self.in_data)
5031 def Run(self, name, validate=True):
5032 """Run an instance allocator and return the results.
5037 alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
5039 if alloc_script is None:
5040 raise errors.OpExecError("Can't find allocator '%s'" % name)
5042 fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
5046 result = utils.RunCmd([alloc_script, fin_name])
5048 raise errors.OpExecError("Instance allocator call failed: %s,"
5050 (result.fail_reason, result.output))
5053 self.out_text = result.stdout
5055 self._ValidateResult()
5057 def _ValidateResult(self):
5058 """Process the allocator results.
5060 This will process and if successful save the result in
5061 self.out_data and the other parameters.
5065 rdict = serializer.Load(self.out_text)
5066 except Exception, err:
5067 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5069 if not isinstance(rdict, dict):
5070 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5072 for key in "success", "info", "nodes":
5073 if key not in rdict:
5074 raise errors.OpExecError("Can't parse iallocator results:"
5075 " missing key '%s'" % key)
5076 setattr(self, key, rdict[key])
5078 if not isinstance(rdict["nodes"], list):
5079 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5081 self.out_data = rdict
5084 class LUTestAllocator(NoHooksLU):
5085 """Run allocator tests.
5087 This LU runs the allocator tests
5090 _OP_REQP = ["direction", "mode", "name"]
5092 def CheckPrereq(self):
5093 """Check prerequisites.
5095 This checks the opcode parameters depending on the director and mode test.
5098 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5099 for attr in ["name", "mem_size", "disks", "disk_template",
5100 "os", "tags", "nics", "vcpus"]:
5101 if not hasattr(self.op, attr):
5102 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5104 iname = self.cfg.ExpandInstanceName(self.op.name)
5105 if iname is not None:
5106 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5108 if not isinstance(self.op.nics, list):
5109 raise errors.OpPrereqError("Invalid parameter 'nics'")
5110 for row in self.op.nics:
5111 if (not isinstance(row, dict) or
5114 "bridge" not in row):
5115 raise errors.OpPrereqError("Invalid contents of the"
5116 " 'nics' parameter")
5117 if not isinstance(self.op.disks, list):
5118 raise errors.OpPrereqError("Invalid parameter 'disks'")
5119 if len(self.op.disks) != 2:
5120 raise errors.OpPrereqError("Only two-disk configurations supported")
5121 for row in self.op.disks:
5122 if (not isinstance(row, dict) or
5123 "size" not in row or
5124 not isinstance(row["size"], int) or
5125 "mode" not in row or
5126 row["mode"] not in ['r', 'w']):
5127 raise errors.OpPrereqError("Invalid contents of the"
5128 " 'disks' parameter")
5129 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5130 if not hasattr(self.op, "name"):
5131 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5132 fname = self.cfg.ExpandInstanceName(self.op.name)
5134 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5136 self.op.name = fname
5137 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5139 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5142 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5143 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5144 raise errors.OpPrereqError("Missing allocator name")
5145 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5146 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5149 def Exec(self, feedback_fn):
5150 """Run the allocator test.
5153 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5154 ial = IAllocator(self.cfg, self.sstore,
5157 mem_size=self.op.mem_size,
5158 disks=self.op.disks,
5159 disk_template=self.op.disk_template,
5163 vcpus=self.op.vcpus,
5166 ial = IAllocator(self.cfg, self.sstore,
5169 relocate_from=list(self.relocate_from),
5172 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5173 result = ial.in_text
5175 ial.Run(self.op.allocator, validate=False)
5176 result = ial.out_text