4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_CLUSTER,
58 REQ_MASTER); note that all commands require root permissions
67 def __init__(self, processor, op, cfg, sstore):
68 """Constructor for LogicalUnit.
70 This needs to be overriden in derived classes in order to check op
78 for attr_name in self._OP_REQP:
79 attr_val = getattr(op, attr_name, None)
81 raise errors.OpPrereqError("Required parameter '%s' missing" %
84 if not cfg.IsCluster():
85 raise errors.OpPrereqError("Cluster not initialized yet,"
86 " use 'gnt-cluster init' first.")
88 master = sstore.GetMasterNode()
89 if master != utils.HostInfo().name:
90 raise errors.OpPrereqError("Commands must be run on the master"
93 def CheckPrereq(self):
94 """Check prerequisites for this LU.
96 This method should check that the prerequisites for the execution
97 of this LU are fulfilled. It can do internode communication, but
98 it should be idempotent - no cluster or system changes are
101 The method should raise errors.OpPrereqError in case something is
102 not fulfilled. Its return value is ignored.
104 This method should also update all the parameters of the opcode to
105 their canonical form; e.g. a short node name must be fully
106 expanded after this method has successfully completed (so that
107 hooks, logging, etc. work correctly).
110 raise NotImplementedError
112 def Exec(self, feedback_fn):
115 This method should implement the actual work. It should raise
116 errors.OpExecError for failures that are somewhat dealt with in
120 raise NotImplementedError
122 def BuildHooksEnv(self):
123 """Build hooks environment for this LU.
125 This method should return a three-node tuple consisting of: a dict
126 containing the environment that will be used for running the
127 specific hook for this LU, a list of node names on which the hook
128 should run before the execution, and a list of node names on which
129 the hook should run after the execution.
131 The keys of the dict must not have 'GANETI_' prefixed as this will
132 be handled in the hooks runner. Also note additional keys will be
133 added by the hooks runner. If the LU doesn't define any
134 environment, an empty dict (and not None) should be returned.
136 No nodes should be returned as an empty list (and not None).
138 Note that if the HPATH for a LU class is None, this function will
142 raise NotImplementedError
144 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
145 """Notify the LU about the results of its hooks.
147 This method is called every time a hooks phase is executed, and notifies
148 the Logical Unit about the hooks' result. The LU can then use it to alter
149 its result based on the hooks. By default the method does nothing and the
150 previous result is passed back unchanged but any LU can define it if it
151 wants to use the local cluster hook-scripts somehow.
154 phase: the hooks phase that has just been run
155 hooks_results: the results of the multi-node hooks rpc call
156 feedback_fn: function to send feedback back to the caller
157 lu_result: the previous result this LU had, or None in the PRE phase.
163 class NoHooksLU(LogicalUnit):
164 """Simple LU which runs no hooks.
166 This LU is intended as a parent for other LogicalUnits which will
167 run no hooks, in order to reduce duplicate code.
174 def _AddHostToEtcHosts(hostname):
175 """Wrapper around utils.SetEtcHostsEntry.
178 hi = utils.HostInfo(name=hostname)
179 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
182 def _RemoveHostFromEtcHosts(hostname):
183 """Wrapper around utils.RemoveEtcHostsEntry.
186 hi = utils.HostInfo(name=hostname)
187 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
188 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
191 def _GetWantedNodes(lu, nodes):
192 """Returns list of checked and expanded node names.
195 nodes: List of nodes (strings) or None for all
198 if not isinstance(nodes, list):
199 raise errors.OpPrereqError("Invalid argument type 'nodes'")
205 node = lu.cfg.ExpandNodeName(name)
207 raise errors.OpPrereqError("No such node name '%s'" % name)
211 wanted = lu.cfg.GetNodeList()
212 return utils.NiceSort(wanted)
215 def _GetWantedInstances(lu, instances):
216 """Returns list of checked and expanded instance names.
219 instances: List of instances (strings) or None for all
222 if not isinstance(instances, list):
223 raise errors.OpPrereqError("Invalid argument type 'instances'")
228 for name in instances:
229 instance = lu.cfg.ExpandInstanceName(name)
231 raise errors.OpPrereqError("No such instance name '%s'" % name)
232 wanted.append(instance)
235 wanted = lu.cfg.GetInstanceList()
236 return utils.NiceSort(wanted)
239 def _CheckOutputFields(static, dynamic, selected):
240 """Checks whether all selected fields are valid.
243 static: Static fields
244 dynamic: Dynamic fields
247 static_fields = frozenset(static)
248 dynamic_fields = frozenset(dynamic)
250 all_fields = static_fields | dynamic_fields
252 if not all_fields.issuperset(selected):
253 raise errors.OpPrereqError("Unknown output fields selected: %s"
254 % ",".join(frozenset(selected).
255 difference(all_fields)))
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259 memory, vcpus, nics):
260 """Builds instance related env variables for hooks from single variables.
263 secondary_nodes: List of secondary nodes as strings
267 "INSTANCE_NAME": name,
268 "INSTANCE_PRIMARY": primary_node,
269 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270 "INSTANCE_OS_TYPE": os_type,
271 "INSTANCE_STATUS": status,
272 "INSTANCE_MEMORY": memory,
273 "INSTANCE_VCPUS": vcpus,
277 nic_count = len(nics)
278 for idx, (ip, bridge, mac) in enumerate(nics):
281 env["INSTANCE_NIC%d_IP" % idx] = ip
282 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
287 env["INSTANCE_NIC_COUNT"] = nic_count
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293 """Builds instance related env variables for hooks from an object.
296 instance: objects.Instance object of instance
297 override: dict of values to override
300 'name': instance.name,
301 'primary_node': instance.primary_node,
302 'secondary_nodes': instance.secondary_nodes,
303 'os_type': instance.os,
304 'status': instance.os,
305 'memory': instance.memory,
306 'vcpus': instance.vcpus,
307 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310 args.update(override)
311 return _BuildInstanceHookEnv(**args)
314 def _UpdateKnownHosts(fullnode, ip, pubkey):
315 """Ensure a node has a correct known_hosts entry.
318 fullnode - Fully qualified domain name of host. (str)
319 ip - IPv4 address of host (str)
320 pubkey - the public key of the cluster
323 if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
324 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
326 f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
335 logger.Debug('read %s' % (repr(rawline),))
337 parts = rawline.rstrip('\r\n').split()
339 # Ignore unwanted lines
340 if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
341 fields = parts[0].split(',')
346 for spec in [ ip, fullnode ]:
347 if spec not in fields:
352 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
353 if haveall and key == pubkey:
355 save_lines.append(rawline)
356 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
359 if havesome and (not haveall or key != pubkey):
361 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
364 save_lines.append(rawline)
367 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
368 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
371 save_lines = save_lines + add_lines
373 # Write a new file and replace old.
374 fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
376 newfile = os.fdopen(fd, 'w')
378 newfile.write(''.join(save_lines))
381 logger.Debug("Wrote new known_hosts.")
382 os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
385 # Simply appending a new line will do the trick.
387 for add in add_lines:
393 def _HasValidVG(vglist, vgname):
394 """Checks if the volume group list is valid.
396 A non-None return value means there's an error, and the return value
397 is the error message.
400 vgsize = vglist.get(vgname, None)
402 return "volume group '%s' missing" % vgname
404 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
409 def _InitSSHSetup(node):
410 """Setup the SSH configuration for the cluster.
413 This generates a dsa keypair for root, adds the pub key to the
414 permitted hosts and adds the hostkey to its own known hosts.
417 node: the name of this host as a fqdn
420 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
422 for name in priv_key, pub_key:
423 if os.path.exists(name):
424 utils.CreateBackup(name)
425 utils.RemoveFile(name)
427 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
431 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
434 f = open(pub_key, 'r')
436 utils.AddAuthorizedKey(auth_keys, f.read(8192))
441 def _InitGanetiServerSetup(ss):
442 """Setup the necessary configuration for the initial node daemon.
444 This creates the nodepass file containing the shared password for
445 the cluster and also generates the SSL certificate.
448 # Create pseudo random password
449 randpass = sha.new(os.urandom(64)).hexdigest()
450 # and write it into sstore
451 ss.SetKey(ss.SS_NODED_PASS, randpass)
453 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
454 "-days", str(365*5), "-nodes", "-x509",
455 "-keyout", constants.SSL_CERT_FILE,
456 "-out", constants.SSL_CERT_FILE, "-batch"])
458 raise errors.OpExecError("could not generate server ssl cert, command"
459 " %s had exitcode %s and error message %s" %
460 (result.cmd, result.exit_code, result.output))
462 os.chmod(constants.SSL_CERT_FILE, 0400)
464 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
467 raise errors.OpExecError("Could not start the node daemon, command %s"
468 " had exitcode %s and error %s" %
469 (result.cmd, result.exit_code, result.output))
472 def _CheckInstanceBridgesExist(instance):
473 """Check that the brigdes needed by an instance exist.
476 # check bridges existance
477 brlist = [nic.bridge for nic in instance.nics]
478 if not rpc.call_bridges_exist(instance.primary_node, brlist):
479 raise errors.OpPrereqError("one or more target bridges %s does not"
480 " exist on destination node '%s'" %
481 (brlist, instance.primary_node))
484 class LUInitCluster(LogicalUnit):
485 """Initialise the cluster.
488 HPATH = "cluster-init"
489 HTYPE = constants.HTYPE_CLUSTER
490 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
491 "def_bridge", "master_netdev"]
494 def BuildHooksEnv(self):
497 Notes: Since we don't require a cluster, we must manually add
498 ourselves in the post-run node list.
501 env = {"OP_TARGET": self.op.cluster_name}
502 return env, [], [self.hostname.name]
504 def CheckPrereq(self):
505 """Verify that the passed name is a valid one.
508 if config.ConfigWriter.IsCluster():
509 raise errors.OpPrereqError("Cluster is already initialised")
511 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
512 if not os.path.exists(constants.VNC_PASSWORD_FILE):
513 raise errors.OpPrereqError("Please prepare the cluster VNC"
515 constants.VNC_PASSWORD_FILE)
517 self.hostname = hostname = utils.HostInfo()
519 if hostname.ip.startswith("127."):
520 raise errors.OpPrereqError("This host's IP resolves to the private"
521 " range (%s). Please fix DNS or %s." %
522 (hostname.ip, constants.ETC_HOSTS))
524 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
525 source=constants.LOCALHOST_IP_ADDRESS):
526 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
527 " to %s,\nbut this ip address does not"
528 " belong to this host."
529 " Aborting." % hostname.ip)
531 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
533 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
535 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
537 secondary_ip = getattr(self.op, "secondary_ip", None)
538 if secondary_ip and not utils.IsValidIP(secondary_ip):
539 raise errors.OpPrereqError("Invalid secondary ip given")
541 secondary_ip != hostname.ip and
542 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
543 source=constants.LOCALHOST_IP_ADDRESS))):
544 raise errors.OpPrereqError("You gave %s as secondary IP,"
545 " but it does not belong to this host." %
547 self.secondary_ip = secondary_ip
549 # checks presence of the volume group given
550 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
553 raise errors.OpPrereqError("Error: %s" % vgstatus)
555 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
557 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
560 if self.op.hypervisor_type not in constants.HYPER_TYPES:
561 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
562 self.op.hypervisor_type)
564 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
566 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
567 (self.op.master_netdev,
568 result.output.strip()))
570 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
571 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
572 raise errors.OpPrereqError("Init.d script '%s' missing or not"
573 " executable." % constants.NODE_INITD_SCRIPT)
575 def Exec(self, feedback_fn):
576 """Initialize the cluster.
579 clustername = self.clustername
580 hostname = self.hostname
582 # set up the simple store
583 self.sstore = ss = ssconf.SimpleStore()
584 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
585 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
586 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
587 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
588 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
590 # set up the inter-node password and certificate
591 _InitGanetiServerSetup(ss)
593 # start the master ip
594 rpc.call_node_start_master(hostname.name)
596 # set up ssh config and /etc/hosts
597 f = open(constants.SSH_HOST_RSA_PUB, 'r')
602 sshkey = sshline.split(" ")[1]
604 _AddHostToEtcHosts(hostname.name)
606 _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
608 _InitSSHSetup(hostname.name)
610 # init of cluster config file
611 self.cfg = cfgw = config.ConfigWriter()
612 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
613 sshkey, self.op.mac_prefix,
614 self.op.vg_name, self.op.def_bridge)
617 class LUDestroyCluster(NoHooksLU):
618 """Logical unit for destroying the cluster.
623 def CheckPrereq(self):
624 """Check prerequisites.
626 This checks whether the cluster is empty.
628 Any errors are signalled by raising errors.OpPrereqError.
631 master = self.sstore.GetMasterNode()
633 nodelist = self.cfg.GetNodeList()
634 if len(nodelist) != 1 or nodelist[0] != master:
635 raise errors.OpPrereqError("There are still %d node(s) in"
636 " this cluster." % (len(nodelist) - 1))
637 instancelist = self.cfg.GetInstanceList()
639 raise errors.OpPrereqError("There are still %d instance(s) in"
640 " this cluster." % len(instancelist))
642 def Exec(self, feedback_fn):
643 """Destroys the cluster.
646 master = self.sstore.GetMasterNode()
647 if not rpc.call_node_stop_master(master):
648 raise errors.OpExecError("Could not disable the master role")
649 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
650 utils.CreateBackup(priv_key)
651 utils.CreateBackup(pub_key)
652 rpc.call_node_leave_cluster(master)
655 class LUVerifyCluster(NoHooksLU):
656 """Verifies the cluster status.
659 _OP_REQP = ["skip_checks"]
661 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
662 remote_version, feedback_fn):
663 """Run multiple tests against a node.
666 - compares ganeti version
667 - checks vg existance and size > 20G
668 - checks config file checksum
669 - checks ssh to other nodes
672 node: name of the node to check
673 file_list: required list of files
674 local_cksum: dictionary of local files and their checksums
677 # compares ganeti version
678 local_version = constants.PROTOCOL_VERSION
679 if not remote_version:
680 feedback_fn(" - ERROR: connection to %s failed" % (node))
683 if local_version != remote_version:
684 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
685 (local_version, node, remote_version))
688 # checks vg existance and size > 20G
692 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
696 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
698 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
701 # checks config file checksum
704 if 'filelist' not in node_result:
706 feedback_fn(" - ERROR: node hasn't returned file checksum data")
708 remote_cksum = node_result['filelist']
709 for file_name in file_list:
710 if file_name not in remote_cksum:
712 feedback_fn(" - ERROR: file '%s' missing" % file_name)
713 elif remote_cksum[file_name] != local_cksum[file_name]:
715 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
717 if 'nodelist' not in node_result:
719 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
721 if node_result['nodelist']:
723 for node in node_result['nodelist']:
724 feedback_fn(" - ERROR: communication with node '%s': %s" %
725 (node, node_result['nodelist'][node]))
726 hyp_result = node_result.get('hypervisor', None)
727 if hyp_result is not None:
728 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
731 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
732 node_instance, feedback_fn):
733 """Verify an instance.
735 This function checks to see if the required block devices are
736 available on the instance's node.
741 node_current = instanceconfig.primary_node
744 instanceconfig.MapLVsByNode(node_vol_should)
746 for node in node_vol_should:
747 for volume in node_vol_should[node]:
748 if node not in node_vol_is or volume not in node_vol_is[node]:
749 feedback_fn(" - ERROR: volume %s missing on node %s" %
753 if not instanceconfig.status == 'down':
754 if (node_current not in node_instance or
755 not instance in node_instance[node_current]):
756 feedback_fn(" - ERROR: instance %s not running on node %s" %
757 (instance, node_current))
760 for node in node_instance:
761 if (not node == node_current):
762 if instance in node_instance[node]:
763 feedback_fn(" - ERROR: instance %s should not run on node %s" %
769 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
770 """Verify if there are any unknown volumes in the cluster.
772 The .os, .swap and backup volumes are ignored. All other volumes are
778 for node in node_vol_is:
779 for volume in node_vol_is[node]:
780 if node not in node_vol_should or volume not in node_vol_should[node]:
781 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
786 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
787 """Verify the list of running instances.
789 This checks what instances are running but unknown to the cluster.
793 for node in node_instance:
794 for runninginstance in node_instance[node]:
795 if runninginstance not in instancelist:
796 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
797 (runninginstance, node))
801 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
802 """Verify N+1 Memory Resilience.
804 Check that if one single node dies we can still start all the instances it
810 for node, nodeinfo in node_info.iteritems():
811 # This code checks that every node which is now listed as secondary has
812 # enough memory to host all instances it is supposed to should a single
813 # other node in the cluster fail.
814 # FIXME: not ready for failover to an arbitrary node
815 # FIXME: does not support file-backed instances
816 # WARNING: we currently take into account down instances as well as up
817 # ones, considering that even if they're down someone might want to start
818 # them even in the event of a node failure.
819 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
821 for instance in instances:
822 needed_mem += instance_cfg[instance].memory
823 if nodeinfo['mfree'] < needed_mem:
824 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
825 " failovers should node %s fail" % (node, prinode))
829 def CheckPrereq(self):
830 """Check prerequisites.
832 Transform the list of checks we're going to skip into a set and check that
833 all its members are valid.
836 self.skip_set = frozenset(self.op.skip_checks)
837 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
838 raise errors.OpPrereqError("Invalid checks to be skipped specified")
840 def Exec(self, feedback_fn):
841 """Verify integrity of cluster, performing various test on nodes.
845 feedback_fn("* Verifying global settings")
846 for msg in self.cfg.VerifyConfig():
847 feedback_fn(" - ERROR: %s" % msg)
849 vg_name = self.cfg.GetVGName()
850 nodelist = utils.NiceSort(self.cfg.GetNodeList())
851 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
852 i_non_redundant = [] # Non redundant instances
858 # FIXME: verify OS list
860 file_names = list(self.sstore.GetFileList())
861 file_names.append(constants.SSL_CERT_FILE)
862 file_names.append(constants.CLUSTER_CONF_FILE)
863 local_checksums = utils.FingerprintFiles(file_names)
865 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
866 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
867 all_instanceinfo = rpc.call_instance_list(nodelist)
868 all_vglist = rpc.call_vg_list(nodelist)
869 node_verify_param = {
870 'filelist': file_names,
871 'nodelist': nodelist,
874 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
875 all_rversion = rpc.call_version(nodelist)
876 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
878 for node in nodelist:
879 feedback_fn("* Verifying node %s" % node)
880 result = self._VerifyNode(node, file_names, local_checksums,
881 all_vglist[node], all_nvinfo[node],
882 all_rversion[node], feedback_fn)
886 volumeinfo = all_volumeinfo[node]
888 if isinstance(volumeinfo, basestring):
889 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
890 (node, volumeinfo[-400:].encode('string_escape')))
892 node_volume[node] = {}
893 elif not isinstance(volumeinfo, dict):
894 feedback_fn(" - ERROR: connection to %s failed" % (node,))
898 node_volume[node] = volumeinfo
901 nodeinstance = all_instanceinfo[node]
902 if type(nodeinstance) != list:
903 feedback_fn(" - ERROR: connection to %s failed" % (node,))
907 node_instance[node] = nodeinstance
910 nodeinfo = all_ninfo[node]
911 if not isinstance(nodeinfo, dict):
912 feedback_fn(" - ERROR: connection to %s failed" % (node,))
918 "mfree": int(nodeinfo['memory_free']),
919 "dfree": int(nodeinfo['vg_free']),
922 # dictionary holding all instances this node is secondary for,
923 # grouped by their primary node. Each key is a cluster node, and each
924 # value is a list of instances which have the key as primary and the
925 # current node as secondary. this is handy to calculate N+1 memory
926 # availability if you can only failover from a primary to its
928 "sinst-by-pnode": {},
931 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
937 for instance in instancelist:
938 feedback_fn("* Verifying instance %s" % instance)
939 inst_config = self.cfg.GetInstanceInfo(instance)
940 result = self._VerifyInstance(instance, inst_config, node_volume,
941 node_instance, feedback_fn)
944 inst_config.MapLVsByNode(node_vol_should)
946 instance_cfg[instance] = inst_config
948 pnode = inst_config.primary_node
949 if pnode in node_info:
950 node_info[pnode]['pinst'].append(instance)
952 feedback_fn(" - ERROR: instance %s, connection to primary node"
953 " %s failed" % (instance, pnode))
956 # If the instance is non-redundant we cannot survive losing its primary
957 # node, so we are not N+1 compliant. On the other hand we have no disk
958 # templates with more than one secondary so that situation is not well
960 # FIXME: does not support file-backed instances
961 if len(inst_config.secondary_nodes) == 0:
962 i_non_redundant.append(instance)
963 elif len(inst_config.secondary_nodes) > 1:
964 feedback_fn(" - WARNING: multiple secondaries for instance %s"
967 for snode in inst_config.secondary_nodes:
968 if snode in node_info:
969 node_info[snode]['sinst'].append(instance)
970 if pnode not in node_info[snode]['sinst-by-pnode']:
971 node_info[snode]['sinst-by-pnode'][pnode] = []
972 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
974 feedback_fn(" - ERROR: instance %s, connection to secondary node"
975 " %s failed" % (instance, snode))
977 feedback_fn("* Verifying orphan volumes")
978 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
982 feedback_fn("* Verifying remaining instances")
983 result = self._VerifyOrphanInstances(instancelist, node_instance,
987 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
988 feedback_fn("* Verifying N+1 Memory redundancy")
989 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
992 feedback_fn("* Other Notes")
994 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
995 % len(i_non_redundant))
1000 class LUVerifyDisks(NoHooksLU):
1001 """Verifies the cluster disks status.
1006 def CheckPrereq(self):
1007 """Check prerequisites.
1009 This has no prerequisites.
1014 def Exec(self, feedback_fn):
1015 """Verify integrity of cluster disks.
1018 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1020 vg_name = self.cfg.GetVGName()
1021 nodes = utils.NiceSort(self.cfg.GetNodeList())
1022 instances = [self.cfg.GetInstanceInfo(name)
1023 for name in self.cfg.GetInstanceList()]
1026 for inst in instances:
1028 if (inst.status != "up" or
1029 inst.disk_template not in constants.DTS_NET_MIRROR):
1031 inst.MapLVsByNode(inst_lvs)
1032 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1033 for node, vol_list in inst_lvs.iteritems():
1034 for vol in vol_list:
1035 nv_dict[(node, vol)] = inst
1040 node_lvs = rpc.call_volume_list(nodes, vg_name)
1045 lvs = node_lvs[node]
1047 if isinstance(lvs, basestring):
1048 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1049 res_nlvm[node] = lvs
1050 elif not isinstance(lvs, dict):
1051 logger.Info("connection to node %s failed or invalid data returned" %
1053 res_nodes.append(node)
1056 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1057 inst = nv_dict.pop((node, lv_name), None)
1058 if (not lv_online and inst is not None
1059 and inst.name not in res_instances):
1060 res_instances.append(inst.name)
1062 # any leftover items in nv_dict are missing LVs, let's arrange the
1064 for key, inst in nv_dict.iteritems():
1065 if inst.name not in res_missing:
1066 res_missing[inst.name] = []
1067 res_missing[inst.name].append(key)
1072 class LURenameCluster(LogicalUnit):
1073 """Rename the cluster.
1076 HPATH = "cluster-rename"
1077 HTYPE = constants.HTYPE_CLUSTER
1080 def BuildHooksEnv(self):
1085 "OP_TARGET": self.sstore.GetClusterName(),
1086 "NEW_NAME": self.op.name,
1088 mn = self.sstore.GetMasterNode()
1089 return env, [mn], [mn]
1091 def CheckPrereq(self):
1092 """Verify that the passed name is a valid one.
1095 hostname = utils.HostInfo(self.op.name)
1097 new_name = hostname.name
1098 self.ip = new_ip = hostname.ip
1099 old_name = self.sstore.GetClusterName()
1100 old_ip = self.sstore.GetMasterIP()
1101 if new_name == old_name and new_ip == old_ip:
1102 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1103 " cluster has changed")
1104 if new_ip != old_ip:
1105 result = utils.RunCmd(["fping", "-q", new_ip])
1106 if not result.failed:
1107 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1108 " reachable on the network. Aborting." %
1111 self.op.name = new_name
1113 def Exec(self, feedback_fn):
1114 """Rename the cluster.
1117 clustername = self.op.name
1121 # shutdown the master IP
1122 master = ss.GetMasterNode()
1123 if not rpc.call_node_stop_master(master):
1124 raise errors.OpExecError("Could not disable the master role")
1128 ss.SetKey(ss.SS_MASTER_IP, ip)
1129 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1131 # Distribute updated ss config to all nodes
1132 myself = self.cfg.GetNodeInfo(master)
1133 dist_nodes = self.cfg.GetNodeList()
1134 if myself.name in dist_nodes:
1135 dist_nodes.remove(myself.name)
1137 logger.Debug("Copying updated ssconf data to all nodes")
1138 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1139 fname = ss.KeyToFilename(keyname)
1140 result = rpc.call_upload_file(dist_nodes, fname)
1141 for to_node in dist_nodes:
1142 if not result[to_node]:
1143 logger.Error("copy of file %s to node %s failed" %
1146 if not rpc.call_node_start_master(master):
1147 logger.Error("Could not re-enable the master role on the master,"
1148 " please restart manually.")
1151 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1152 """Sleep and poll for an instance's disk to sync.
1155 if not instance.disks:
1159 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1161 node = instance.primary_node
1163 for dev in instance.disks:
1164 cfgw.SetDiskID(dev, node)
1170 cumul_degraded = False
1171 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1173 proc.LogWarning("Can't get any data from node %s" % node)
1176 raise errors.RemoteError("Can't contact node %s for mirror data,"
1177 " aborting." % node)
1181 for i in range(len(rstats)):
1184 proc.LogWarning("Can't compute data for node %s/%s" %
1185 (node, instance.disks[i].iv_name))
1187 # we ignore the ldisk parameter
1188 perc_done, est_time, is_degraded, _ = mstat
1189 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1190 if perc_done is not None:
1192 if est_time is not None:
1193 rem_time = "%d estimated seconds remaining" % est_time
1196 rem_time = "no time estimate"
1197 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1198 (instance.disks[i].iv_name, perc_done, rem_time))
1205 time.sleep(min(60, max_time))
1211 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1212 return not cumul_degraded
1215 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1216 """Check that mirrors are not degraded.
1218 The ldisk parameter, if True, will change the test from the
1219 is_degraded attribute (which represents overall non-ok status for
1220 the device(s)) to the ldisk (representing the local storage status).
1223 cfgw.SetDiskID(dev, node)
1230 if on_primary or dev.AssembleOnSecondary():
1231 rstats = rpc.call_blockdev_find(node, dev)
1233 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1236 result = result and (not rstats[idx])
1238 for child in dev.children:
1239 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1244 class LUDiagnoseOS(NoHooksLU):
1245 """Logical unit for OS diagnose/query.
1248 _OP_REQP = ["output_fields", "names"]
1250 def CheckPrereq(self):
1251 """Check prerequisites.
1253 This always succeeds, since this is a pure query LU.
1257 raise errors.OpPrereqError("Selective OS query not supported")
1259 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1260 _CheckOutputFields(static=[],
1261 dynamic=self.dynamic_fields,
1262 selected=self.op.output_fields)
1265 def _DiagnoseByOS(node_list, rlist):
1266 """Remaps a per-node return list into an a per-os per-node dictionary
1269 node_list: a list with the names of all nodes
1270 rlist: a map with node names as keys and OS objects as values
1273 map: a map with osnames as keys and as value another map, with
1275 keys and list of OS objects as values
1276 e.g. {"debian-etch": {"node1": [<object>,...],
1277 "node2": [<object>,]}
1282 for node_name, nr in rlist.iteritems():
1286 if os.name not in all_os:
1287 # build a list of nodes for this os containing empty lists
1288 # for each node in node_list
1289 all_os[os.name] = {}
1290 for nname in node_list:
1291 all_os[os.name][nname] = []
1292 all_os[os.name][node_name].append(os)
1295 def Exec(self, feedback_fn):
1296 """Compute the list of OSes.
1299 node_list = self.cfg.GetNodeList()
1300 node_data = rpc.call_os_diagnose(node_list)
1301 if node_data == False:
1302 raise errors.OpExecError("Can't gather the list of OSes")
1303 pol = self._DiagnoseByOS(node_list, node_data)
1305 for os_name, os_data in pol.iteritems():
1307 for field in self.op.output_fields:
1310 elif field == "valid":
1311 val = utils.all([osl and osl[0] for osl in os_data.values()])
1312 elif field == "node_status":
1314 for node_name, nos_list in os_data.iteritems():
1315 val[node_name] = [(v.status, v.path) for v in nos_list]
1317 raise errors.ParameterError(field)
1324 class LURemoveNode(LogicalUnit):
1325 """Logical unit for removing a node.
1328 HPATH = "node-remove"
1329 HTYPE = constants.HTYPE_NODE
1330 _OP_REQP = ["node_name"]
1332 def BuildHooksEnv(self):
1335 This doesn't run on the target node in the pre phase as a failed
1336 node would not allows itself to run.
1340 "OP_TARGET": self.op.node_name,
1341 "NODE_NAME": self.op.node_name,
1343 all_nodes = self.cfg.GetNodeList()
1344 all_nodes.remove(self.op.node_name)
1345 return env, all_nodes, all_nodes
1347 def CheckPrereq(self):
1348 """Check prerequisites.
1351 - the node exists in the configuration
1352 - it does not have primary or secondary instances
1353 - it's not the master
1355 Any errors are signalled by raising errors.OpPrereqError.
1358 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1360 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1362 instance_list = self.cfg.GetInstanceList()
1364 masternode = self.sstore.GetMasterNode()
1365 if node.name == masternode:
1366 raise errors.OpPrereqError("Node is the master node,"
1367 " you need to failover first.")
1369 for instance_name in instance_list:
1370 instance = self.cfg.GetInstanceInfo(instance_name)
1371 if node.name == instance.primary_node:
1372 raise errors.OpPrereqError("Instance %s still running on the node,"
1373 " please remove first." % instance_name)
1374 if node.name in instance.secondary_nodes:
1375 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1376 " please remove first." % instance_name)
1377 self.op.node_name = node.name
1380 def Exec(self, feedback_fn):
1381 """Removes the node from the cluster.
1385 logger.Info("stopping the node daemon and removing configs from node %s" %
1388 rpc.call_node_leave_cluster(node.name)
1390 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1392 logger.Info("Removing node %s from config" % node.name)
1394 self.cfg.RemoveNode(node.name)
1396 _RemoveHostFromEtcHosts(node.name)
1399 class LUQueryNodes(NoHooksLU):
1400 """Logical unit for querying nodes.
1403 _OP_REQP = ["output_fields", "names"]
1405 def CheckPrereq(self):
1406 """Check prerequisites.
1408 This checks that the fields required are valid output fields.
1411 self.dynamic_fields = frozenset(["dtotal", "dfree",
1412 "mtotal", "mnode", "mfree",
1415 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1416 "pinst_list", "sinst_list",
1418 dynamic=self.dynamic_fields,
1419 selected=self.op.output_fields)
1421 self.wanted = _GetWantedNodes(self, self.op.names)
1423 def Exec(self, feedback_fn):
1424 """Computes the list of nodes and their attributes.
1427 nodenames = self.wanted
1428 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1430 # begin data gathering
1432 if self.dynamic_fields.intersection(self.op.output_fields):
1434 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1435 for name in nodenames:
1436 nodeinfo = node_data.get(name, None)
1439 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1440 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1441 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1442 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1443 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1444 "bootid": nodeinfo['bootid'],
1447 live_data[name] = {}
1449 live_data = dict.fromkeys(nodenames, {})
1451 node_to_primary = dict([(name, set()) for name in nodenames])
1452 node_to_secondary = dict([(name, set()) for name in nodenames])
1454 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1455 "sinst_cnt", "sinst_list"))
1456 if inst_fields & frozenset(self.op.output_fields):
1457 instancelist = self.cfg.GetInstanceList()
1459 for instance_name in instancelist:
1460 inst = self.cfg.GetInstanceInfo(instance_name)
1461 if inst.primary_node in node_to_primary:
1462 node_to_primary[inst.primary_node].add(inst.name)
1463 for secnode in inst.secondary_nodes:
1464 if secnode in node_to_secondary:
1465 node_to_secondary[secnode].add(inst.name)
1467 # end data gathering
1470 for node in nodelist:
1472 for field in self.op.output_fields:
1475 elif field == "pinst_list":
1476 val = list(node_to_primary[node.name])
1477 elif field == "sinst_list":
1478 val = list(node_to_secondary[node.name])
1479 elif field == "pinst_cnt":
1480 val = len(node_to_primary[node.name])
1481 elif field == "sinst_cnt":
1482 val = len(node_to_secondary[node.name])
1483 elif field == "pip":
1484 val = node.primary_ip
1485 elif field == "sip":
1486 val = node.secondary_ip
1487 elif field in self.dynamic_fields:
1488 val = live_data[node.name].get(field, None)
1490 raise errors.ParameterError(field)
1491 node_output.append(val)
1492 output.append(node_output)
1497 class LUQueryNodeVolumes(NoHooksLU):
1498 """Logical unit for getting volumes on node(s).
1501 _OP_REQP = ["nodes", "output_fields"]
1503 def CheckPrereq(self):
1504 """Check prerequisites.
1506 This checks that the fields required are valid output fields.
1509 self.nodes = _GetWantedNodes(self, self.op.nodes)
1511 _CheckOutputFields(static=["node"],
1512 dynamic=["phys", "vg", "name", "size", "instance"],
1513 selected=self.op.output_fields)
1516 def Exec(self, feedback_fn):
1517 """Computes the list of nodes and their attributes.
1520 nodenames = self.nodes
1521 volumes = rpc.call_node_volumes(nodenames)
1523 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1524 in self.cfg.GetInstanceList()]
1526 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1529 for node in nodenames:
1530 if node not in volumes or not volumes[node]:
1533 node_vols = volumes[node][:]
1534 node_vols.sort(key=lambda vol: vol['dev'])
1536 for vol in node_vols:
1538 for field in self.op.output_fields:
1541 elif field == "phys":
1545 elif field == "name":
1547 elif field == "size":
1548 val = int(float(vol['size']))
1549 elif field == "instance":
1551 if node not in lv_by_node[inst]:
1553 if vol['name'] in lv_by_node[inst][node]:
1559 raise errors.ParameterError(field)
1560 node_output.append(str(val))
1562 output.append(node_output)
1567 class LUAddNode(LogicalUnit):
1568 """Logical unit for adding node to the cluster.
1572 HTYPE = constants.HTYPE_NODE
1573 _OP_REQP = ["node_name"]
1575 def BuildHooksEnv(self):
1578 This will run on all nodes before, and on all nodes + the new node after.
1582 "OP_TARGET": self.op.node_name,
1583 "NODE_NAME": self.op.node_name,
1584 "NODE_PIP": self.op.primary_ip,
1585 "NODE_SIP": self.op.secondary_ip,
1587 nodes_0 = self.cfg.GetNodeList()
1588 nodes_1 = nodes_0 + [self.op.node_name, ]
1589 return env, nodes_0, nodes_1
1591 def CheckPrereq(self):
1592 """Check prerequisites.
1595 - the new node is not already in the config
1597 - its parameters (single/dual homed) matches the cluster
1599 Any errors are signalled by raising errors.OpPrereqError.
1602 node_name = self.op.node_name
1605 dns_data = utils.HostInfo(node_name)
1607 node = dns_data.name
1608 primary_ip = self.op.primary_ip = dns_data.ip
1609 secondary_ip = getattr(self.op, "secondary_ip", None)
1610 if secondary_ip is None:
1611 secondary_ip = primary_ip
1612 if not utils.IsValidIP(secondary_ip):
1613 raise errors.OpPrereqError("Invalid secondary IP given")
1614 self.op.secondary_ip = secondary_ip
1615 node_list = cfg.GetNodeList()
1616 if node in node_list:
1617 raise errors.OpPrereqError("Node %s is already in the configuration"
1620 for existing_node_name in node_list:
1621 existing_node = cfg.GetNodeInfo(existing_node_name)
1622 if (existing_node.primary_ip == primary_ip or
1623 existing_node.secondary_ip == primary_ip or
1624 existing_node.primary_ip == secondary_ip or
1625 existing_node.secondary_ip == secondary_ip):
1626 raise errors.OpPrereqError("New node ip address(es) conflict with"
1627 " existing node %s" % existing_node.name)
1629 # check that the type of the node (single versus dual homed) is the
1630 # same as for the master
1631 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1632 master_singlehomed = myself.secondary_ip == myself.primary_ip
1633 newbie_singlehomed = secondary_ip == primary_ip
1634 if master_singlehomed != newbie_singlehomed:
1635 if master_singlehomed:
1636 raise errors.OpPrereqError("The master has no private ip but the"
1637 " new node has one")
1639 raise errors.OpPrereqError("The master has a private ip but the"
1640 " new node doesn't have one")
1642 # checks reachablity
1643 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1644 raise errors.OpPrereqError("Node not reachable by ping")
1646 if not newbie_singlehomed:
1647 # check reachability from my secondary ip to newbie's secondary ip
1648 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1649 source=myself.secondary_ip):
1650 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1651 " based ping to noded port")
1653 self.new_node = objects.Node(name=node,
1654 primary_ip=primary_ip,
1655 secondary_ip=secondary_ip)
1657 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1658 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1659 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1660 constants.VNC_PASSWORD_FILE)
1662 def Exec(self, feedback_fn):
1663 """Adds the new node to the cluster.
1666 new_node = self.new_node
1667 node = new_node.name
1669 # set up inter-node password and certificate and restarts the node daemon
1670 gntpass = self.sstore.GetNodeDaemonPassword()
1671 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1672 raise errors.OpExecError("ganeti password corruption detected")
1673 f = open(constants.SSL_CERT_FILE)
1675 gntpem = f.read(8192)
1678 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1679 # so we use this to detect an invalid certificate; as long as the
1680 # cert doesn't contain this, the here-document will be correctly
1681 # parsed by the shell sequence below
1682 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1683 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1684 if not gntpem.endswith("\n"):
1685 raise errors.OpExecError("PEM must end with newline")
1686 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1688 # and then connect with ssh to set password and start ganeti-noded
1689 # note that all the below variables are sanitized at this point,
1690 # either by being constants or by the checks above
1692 mycommand = ("umask 077 && "
1693 "echo '%s' > '%s' && "
1694 "cat > '%s' << '!EOF.' && \n"
1695 "%s!EOF.\n%s restart" %
1696 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1697 constants.SSL_CERT_FILE, gntpem,
1698 constants.NODE_INITD_SCRIPT))
1700 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1702 raise errors.OpExecError("Remote command on node %s, error: %s,"
1704 (node, result.fail_reason, result.output))
1706 # check connectivity
1709 result = rpc.call_version([node])[node]
1711 if constants.PROTOCOL_VERSION == result:
1712 logger.Info("communication to node %s fine, sw version %s match" %
1715 raise errors.OpExecError("Version mismatch master version %s,"
1716 " node version %s" %
1717 (constants.PROTOCOL_VERSION, result))
1719 raise errors.OpExecError("Cannot get version from the new node")
1722 logger.Info("copy ssh key to node %s" % node)
1723 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1725 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1726 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1732 keyarray.append(f.read())
1736 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1737 keyarray[3], keyarray[4], keyarray[5])
1740 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1742 # Add node to our /etc/hosts, and add key to known_hosts
1743 _AddHostToEtcHosts(new_node.name)
1745 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1746 self.cfg.GetHostKey())
1748 if new_node.secondary_ip != new_node.primary_ip:
1749 if not rpc.call_node_tcp_ping(new_node.name,
1750 constants.LOCALHOST_IP_ADDRESS,
1751 new_node.secondary_ip,
1752 constants.DEFAULT_NODED_PORT,
1754 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1755 " you gave (%s). Please fix and re-run this"
1756 " command." % new_node.secondary_ip)
1758 success, msg = ssh.VerifyNodeHostname(node)
1760 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1761 " than the one the resolver gives: %s."
1762 " Please fix and re-run this command." %
1765 # Distribute updated /etc/hosts and known_hosts to all nodes,
1766 # including the node just added
1767 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1768 dist_nodes = self.cfg.GetNodeList() + [node]
1769 if myself.name in dist_nodes:
1770 dist_nodes.remove(myself.name)
1772 logger.Debug("Copying hosts and known_hosts to all nodes")
1773 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1774 result = rpc.call_upload_file(dist_nodes, fname)
1775 for to_node in dist_nodes:
1776 if not result[to_node]:
1777 logger.Error("copy of file %s to node %s failed" %
1780 to_copy = ss.GetFileList()
1781 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1782 to_copy.append(constants.VNC_PASSWORD_FILE)
1783 for fname in to_copy:
1784 if not ssh.CopyFileToNode(node, fname):
1785 logger.Error("could not copy file %s to node %s" % (fname, node))
1787 logger.Info("adding node %s to cluster.conf" % node)
1788 self.cfg.AddNode(new_node)
1791 class LUMasterFailover(LogicalUnit):
1792 """Failover the master node to the current node.
1794 This is a special LU in that it must run on a non-master node.
1797 HPATH = "master-failover"
1798 HTYPE = constants.HTYPE_CLUSTER
1802 def BuildHooksEnv(self):
1805 This will run on the new master only in the pre phase, and on all
1806 the nodes in the post phase.
1810 "OP_TARGET": self.new_master,
1811 "NEW_MASTER": self.new_master,
1812 "OLD_MASTER": self.old_master,
1814 return env, [self.new_master], self.cfg.GetNodeList()
1816 def CheckPrereq(self):
1817 """Check prerequisites.
1819 This checks that we are not already the master.
1822 self.new_master = utils.HostInfo().name
1823 self.old_master = self.sstore.GetMasterNode()
1825 if self.old_master == self.new_master:
1826 raise errors.OpPrereqError("This commands must be run on the node"
1827 " where you want the new master to be."
1828 " %s is already the master" %
1831 def Exec(self, feedback_fn):
1832 """Failover the master node.
1834 This command, when run on a non-master node, will cause the current
1835 master to cease being master, and the non-master to become new
1839 #TODO: do not rely on gethostname returning the FQDN
1840 logger.Info("setting master to %s, old master: %s" %
1841 (self.new_master, self.old_master))
1843 if not rpc.call_node_stop_master(self.old_master):
1844 logger.Error("could disable the master role on the old master"
1845 " %s, please disable manually" % self.old_master)
1848 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1849 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1850 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1851 logger.Error("could not distribute the new simple store master file"
1852 " to the other nodes, please check.")
1854 if not rpc.call_node_start_master(self.new_master):
1855 logger.Error("could not start the master role on the new master"
1856 " %s, please check" % self.new_master)
1857 feedback_fn("Error in activating the master IP on the new master,"
1858 " please fix manually.")
1862 class LUQueryClusterInfo(NoHooksLU):
1863 """Query cluster configuration.
1869 def CheckPrereq(self):
1870 """No prerequsites needed for this LU.
1875 def Exec(self, feedback_fn):
1876 """Return cluster config.
1880 "name": self.sstore.GetClusterName(),
1881 "software_version": constants.RELEASE_VERSION,
1882 "protocol_version": constants.PROTOCOL_VERSION,
1883 "config_version": constants.CONFIG_VERSION,
1884 "os_api_version": constants.OS_API_VERSION,
1885 "export_version": constants.EXPORT_VERSION,
1886 "master": self.sstore.GetMasterNode(),
1887 "architecture": (platform.architecture()[0], platform.machine()),
1893 class LUClusterCopyFile(NoHooksLU):
1894 """Copy file to cluster.
1897 _OP_REQP = ["nodes", "filename"]
1899 def CheckPrereq(self):
1900 """Check prerequisites.
1902 It should check that the named file exists and that the given list
1906 if not os.path.exists(self.op.filename):
1907 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1909 self.nodes = _GetWantedNodes(self, self.op.nodes)
1911 def Exec(self, feedback_fn):
1912 """Copy a file from master to some nodes.
1915 opts - class with options as members
1916 args - list containing a single element, the file name
1918 nodes - list containing the name of target nodes; if empty, all nodes
1921 filename = self.op.filename
1923 myname = utils.HostInfo().name
1925 for node in self.nodes:
1928 if not ssh.CopyFileToNode(node, filename):
1929 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1932 class LUDumpClusterConfig(NoHooksLU):
1933 """Return a text-representation of the cluster-config.
1938 def CheckPrereq(self):
1939 """No prerequisites.
1944 def Exec(self, feedback_fn):
1945 """Dump a representation of the cluster config to the standard output.
1948 return self.cfg.DumpConfig()
1951 class LURunClusterCommand(NoHooksLU):
1952 """Run a command on some nodes.
1955 _OP_REQP = ["command", "nodes"]
1957 def CheckPrereq(self):
1958 """Check prerequisites.
1960 It checks that the given list of nodes is valid.
1963 self.nodes = _GetWantedNodes(self, self.op.nodes)
1965 def Exec(self, feedback_fn):
1966 """Run a command on some nodes.
1969 # put the master at the end of the nodes list
1970 master_node = self.sstore.GetMasterNode()
1971 if master_node in self.nodes:
1972 self.nodes.remove(master_node)
1973 self.nodes.append(master_node)
1976 for node in self.nodes:
1977 result = ssh.SSHCall(node, "root", self.op.command)
1978 data.append((node, result.output, result.exit_code))
1983 class LUActivateInstanceDisks(NoHooksLU):
1984 """Bring up an instance's disks.
1987 _OP_REQP = ["instance_name"]
1989 def CheckPrereq(self):
1990 """Check prerequisites.
1992 This checks that the instance is in the cluster.
1995 instance = self.cfg.GetInstanceInfo(
1996 self.cfg.ExpandInstanceName(self.op.instance_name))
1997 if instance is None:
1998 raise errors.OpPrereqError("Instance '%s' not known" %
1999 self.op.instance_name)
2000 self.instance = instance
2003 def Exec(self, feedback_fn):
2004 """Activate the disks.
2007 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
2009 raise errors.OpExecError("Cannot activate block devices")
2014 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
2015 """Prepare the block devices for an instance.
2017 This sets up the block devices on all nodes.
2020 instance: a ganeti.objects.Instance object
2021 ignore_secondaries: if true, errors on secondary nodes won't result
2022 in an error return from the function
2025 false if the operation failed
2026 list of (host, instance_visible_name, node_visible_name) if the operation
2027 suceeded with the mapping from node devices to instance devices
2031 iname = instance.name
2032 # With the two passes mechanism we try to reduce the window of
2033 # opportunity for the race condition of switching DRBD to primary
2034 # before handshaking occured, but we do not eliminate it
2036 # The proper fix would be to wait (with some limits) until the
2037 # connection has been made and drbd transitions from WFConnection
2038 # into any other network-connected state (Connected, SyncTarget,
2041 # 1st pass, assemble on all nodes in secondary mode
2042 for inst_disk in instance.disks:
2043 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2044 cfg.SetDiskID(node_disk, node)
2045 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
2047 logger.Error("could not prepare block device %s on node %s"
2048 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2049 if not ignore_secondaries:
2052 # FIXME: race condition on drbd migration to primary
2054 # 2nd pass, do only the primary node
2055 for inst_disk in instance.disks:
2056 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2057 if node != instance.primary_node:
2059 cfg.SetDiskID(node_disk, node)
2060 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2062 logger.Error("could not prepare block device %s on node %s"
2063 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2065 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2067 # leave the disks configured for the primary node
2068 # this is a workaround that would be fixed better by
2069 # improving the logical/physical id handling
2070 for disk in instance.disks:
2071 cfg.SetDiskID(disk, instance.primary_node)
2073 return disks_ok, device_info
2076 def _StartInstanceDisks(cfg, instance, force):
2077 """Start the disks of an instance.
2080 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2081 ignore_secondaries=force)
2083 _ShutdownInstanceDisks(instance, cfg)
2084 if force is not None and not force:
2085 logger.Error("If the message above refers to a secondary node,"
2086 " you can retry the operation using '--force'.")
2087 raise errors.OpExecError("Disk consistency error")
2090 class LUDeactivateInstanceDisks(NoHooksLU):
2091 """Shutdown an instance's disks.
2094 _OP_REQP = ["instance_name"]
2096 def CheckPrereq(self):
2097 """Check prerequisites.
2099 This checks that the instance is in the cluster.
2102 instance = self.cfg.GetInstanceInfo(
2103 self.cfg.ExpandInstanceName(self.op.instance_name))
2104 if instance is None:
2105 raise errors.OpPrereqError("Instance '%s' not known" %
2106 self.op.instance_name)
2107 self.instance = instance
2109 def Exec(self, feedback_fn):
2110 """Deactivate the disks
2113 instance = self.instance
2114 ins_l = rpc.call_instance_list([instance.primary_node])
2115 ins_l = ins_l[instance.primary_node]
2116 if not type(ins_l) is list:
2117 raise errors.OpExecError("Can't contact node '%s'" %
2118 instance.primary_node)
2120 if self.instance.name in ins_l:
2121 raise errors.OpExecError("Instance is running, can't shutdown"
2124 _ShutdownInstanceDisks(instance, self.cfg)
2127 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2128 """Shutdown block devices of an instance.
2130 This does the shutdown on all nodes of the instance.
2132 If the ignore_primary is false, errors on the primary node are
2137 for disk in instance.disks:
2138 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2139 cfg.SetDiskID(top_disk, node)
2140 if not rpc.call_blockdev_shutdown(node, top_disk):
2141 logger.Error("could not shutdown block device %s on node %s" %
2142 (disk.iv_name, node))
2143 if not ignore_primary or node != instance.primary_node:
2148 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2149 """Checks if a node has enough free memory.
2151 This function check if a given node has the needed amount of free
2152 memory. In case the node has less memory or we cannot get the
2153 information from the node, this function raise an OpPrereqError
2157 - cfg: a ConfigWriter instance
2158 - node: the node name
2159 - reason: string to use in the error message
2160 - requested: the amount of memory in MiB
2163 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2164 if not nodeinfo or not isinstance(nodeinfo, dict):
2165 raise errors.OpPrereqError("Could not contact node %s for resource"
2166 " information" % (node,))
2168 free_mem = nodeinfo[node].get('memory_free')
2169 if not isinstance(free_mem, int):
2170 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2171 " was '%s'" % (node, free_mem))
2172 if requested > free_mem:
2173 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2174 " needed %s MiB, available %s MiB" %
2175 (node, reason, requested, free_mem))
2178 class LUStartupInstance(LogicalUnit):
2179 """Starts an instance.
2182 HPATH = "instance-start"
2183 HTYPE = constants.HTYPE_INSTANCE
2184 _OP_REQP = ["instance_name", "force"]
2186 def BuildHooksEnv(self):
2189 This runs on master, primary and secondary nodes of the instance.
2193 "FORCE": self.op.force,
2195 env.update(_BuildInstanceHookEnvByObject(self.instance))
2196 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2197 list(self.instance.secondary_nodes))
2200 def CheckPrereq(self):
2201 """Check prerequisites.
2203 This checks that the instance is in the cluster.
2206 instance = self.cfg.GetInstanceInfo(
2207 self.cfg.ExpandInstanceName(self.op.instance_name))
2208 if instance is None:
2209 raise errors.OpPrereqError("Instance '%s' not known" %
2210 self.op.instance_name)
2212 # check bridges existance
2213 _CheckInstanceBridgesExist(instance)
2215 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2216 "starting instance %s" % instance.name,
2219 self.instance = instance
2220 self.op.instance_name = instance.name
2222 def Exec(self, feedback_fn):
2223 """Start the instance.
2226 instance = self.instance
2227 force = self.op.force
2228 extra_args = getattr(self.op, "extra_args", "")
2230 self.cfg.MarkInstanceUp(instance.name)
2232 node_current = instance.primary_node
2234 _StartInstanceDisks(self.cfg, instance, force)
2236 if not rpc.call_instance_start(node_current, instance, extra_args):
2237 _ShutdownInstanceDisks(instance, self.cfg)
2238 raise errors.OpExecError("Could not start instance")
2241 class LURebootInstance(LogicalUnit):
2242 """Reboot an instance.
2245 HPATH = "instance-reboot"
2246 HTYPE = constants.HTYPE_INSTANCE
2247 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2249 def BuildHooksEnv(self):
2252 This runs on master, primary and secondary nodes of the instance.
2256 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2258 env.update(_BuildInstanceHookEnvByObject(self.instance))
2259 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2260 list(self.instance.secondary_nodes))
2263 def CheckPrereq(self):
2264 """Check prerequisites.
2266 This checks that the instance is in the cluster.
2269 instance = self.cfg.GetInstanceInfo(
2270 self.cfg.ExpandInstanceName(self.op.instance_name))
2271 if instance is None:
2272 raise errors.OpPrereqError("Instance '%s' not known" %
2273 self.op.instance_name)
2275 # check bridges existance
2276 _CheckInstanceBridgesExist(instance)
2278 self.instance = instance
2279 self.op.instance_name = instance.name
2281 def Exec(self, feedback_fn):
2282 """Reboot the instance.
2285 instance = self.instance
2286 ignore_secondaries = self.op.ignore_secondaries
2287 reboot_type = self.op.reboot_type
2288 extra_args = getattr(self.op, "extra_args", "")
2290 node_current = instance.primary_node
2292 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2293 constants.INSTANCE_REBOOT_HARD,
2294 constants.INSTANCE_REBOOT_FULL]:
2295 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2296 (constants.INSTANCE_REBOOT_SOFT,
2297 constants.INSTANCE_REBOOT_HARD,
2298 constants.INSTANCE_REBOOT_FULL))
2300 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2301 constants.INSTANCE_REBOOT_HARD]:
2302 if not rpc.call_instance_reboot(node_current, instance,
2303 reboot_type, extra_args):
2304 raise errors.OpExecError("Could not reboot instance")
2306 if not rpc.call_instance_shutdown(node_current, instance):
2307 raise errors.OpExecError("could not shutdown instance for full reboot")
2308 _ShutdownInstanceDisks(instance, self.cfg)
2309 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2310 if not rpc.call_instance_start(node_current, instance, extra_args):
2311 _ShutdownInstanceDisks(instance, self.cfg)
2312 raise errors.OpExecError("Could not start instance for full reboot")
2314 self.cfg.MarkInstanceUp(instance.name)
2317 class LUShutdownInstance(LogicalUnit):
2318 """Shutdown an instance.
2321 HPATH = "instance-stop"
2322 HTYPE = constants.HTYPE_INSTANCE
2323 _OP_REQP = ["instance_name"]
2325 def BuildHooksEnv(self):
2328 This runs on master, primary and secondary nodes of the instance.
2331 env = _BuildInstanceHookEnvByObject(self.instance)
2332 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2333 list(self.instance.secondary_nodes))
2336 def CheckPrereq(self):
2337 """Check prerequisites.
2339 This checks that the instance is in the cluster.
2342 instance = self.cfg.GetInstanceInfo(
2343 self.cfg.ExpandInstanceName(self.op.instance_name))
2344 if instance is None:
2345 raise errors.OpPrereqError("Instance '%s' not known" %
2346 self.op.instance_name)
2347 self.instance = instance
2349 def Exec(self, feedback_fn):
2350 """Shutdown the instance.
2353 instance = self.instance
2354 node_current = instance.primary_node
2355 self.cfg.MarkInstanceDown(instance.name)
2356 if not rpc.call_instance_shutdown(node_current, instance):
2357 logger.Error("could not shutdown instance")
2359 _ShutdownInstanceDisks(instance, self.cfg)
2362 class LUReinstallInstance(LogicalUnit):
2363 """Reinstall an instance.
2366 HPATH = "instance-reinstall"
2367 HTYPE = constants.HTYPE_INSTANCE
2368 _OP_REQP = ["instance_name"]
2370 def BuildHooksEnv(self):
2373 This runs on master, primary and secondary nodes of the instance.
2376 env = _BuildInstanceHookEnvByObject(self.instance)
2377 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2378 list(self.instance.secondary_nodes))
2381 def CheckPrereq(self):
2382 """Check prerequisites.
2384 This checks that the instance is in the cluster and is not running.
2387 instance = self.cfg.GetInstanceInfo(
2388 self.cfg.ExpandInstanceName(self.op.instance_name))
2389 if instance is None:
2390 raise errors.OpPrereqError("Instance '%s' not known" %
2391 self.op.instance_name)
2392 if instance.disk_template == constants.DT_DISKLESS:
2393 raise errors.OpPrereqError("Instance '%s' has no disks" %
2394 self.op.instance_name)
2395 if instance.status != "down":
2396 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2397 self.op.instance_name)
2398 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2400 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2401 (self.op.instance_name,
2402 instance.primary_node))
2404 self.op.os_type = getattr(self.op, "os_type", None)
2405 if self.op.os_type is not None:
2407 pnode = self.cfg.GetNodeInfo(
2408 self.cfg.ExpandNodeName(instance.primary_node))
2410 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2412 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2414 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2415 " primary node" % self.op.os_type)
2417 self.instance = instance
2419 def Exec(self, feedback_fn):
2420 """Reinstall the instance.
2423 inst = self.instance
2425 if self.op.os_type is not None:
2426 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2427 inst.os = self.op.os_type
2428 self.cfg.AddInstance(inst)
2430 _StartInstanceDisks(self.cfg, inst, None)
2432 feedback_fn("Running the instance OS create scripts...")
2433 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2434 raise errors.OpExecError("Could not install OS for instance %s"
2436 (inst.name, inst.primary_node))
2438 _ShutdownInstanceDisks(inst, self.cfg)
2441 class LURenameInstance(LogicalUnit):
2442 """Rename an instance.
2445 HPATH = "instance-rename"
2446 HTYPE = constants.HTYPE_INSTANCE
2447 _OP_REQP = ["instance_name", "new_name"]
2449 def BuildHooksEnv(self):
2452 This runs on master, primary and secondary nodes of the instance.
2455 env = _BuildInstanceHookEnvByObject(self.instance)
2456 env["INSTANCE_NEW_NAME"] = self.op.new_name
2457 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2458 list(self.instance.secondary_nodes))
2461 def CheckPrereq(self):
2462 """Check prerequisites.
2464 This checks that the instance is in the cluster and is not running.
2467 instance = self.cfg.GetInstanceInfo(
2468 self.cfg.ExpandInstanceName(self.op.instance_name))
2469 if instance is None:
2470 raise errors.OpPrereqError("Instance '%s' not known" %
2471 self.op.instance_name)
2472 if instance.status != "down":
2473 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2474 self.op.instance_name)
2475 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2477 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2478 (self.op.instance_name,
2479 instance.primary_node))
2480 self.instance = instance
2482 # new name verification
2483 name_info = utils.HostInfo(self.op.new_name)
2485 self.op.new_name = new_name = name_info.name
2486 instance_list = self.cfg.GetInstanceList()
2487 if new_name in instance_list:
2488 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2491 if not getattr(self.op, "ignore_ip", False):
2492 command = ["fping", "-q", name_info.ip]
2493 result = utils.RunCmd(command)
2494 if not result.failed:
2495 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2496 (name_info.ip, new_name))
2499 def Exec(self, feedback_fn):
2500 """Reinstall the instance.
2503 inst = self.instance
2504 old_name = inst.name
2506 self.cfg.RenameInstance(inst.name, self.op.new_name)
2508 # re-read the instance from the configuration after rename
2509 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2511 _StartInstanceDisks(self.cfg, inst, None)
2513 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2515 msg = ("Could run OS rename script for instance %s on node %s (but the"
2516 " instance has been renamed in Ganeti)" %
2517 (inst.name, inst.primary_node))
2520 _ShutdownInstanceDisks(inst, self.cfg)
2523 class LURemoveInstance(LogicalUnit):
2524 """Remove an instance.
2527 HPATH = "instance-remove"
2528 HTYPE = constants.HTYPE_INSTANCE
2529 _OP_REQP = ["instance_name"]
2531 def BuildHooksEnv(self):
2534 This runs on master, primary and secondary nodes of the instance.
2537 env = _BuildInstanceHookEnvByObject(self.instance)
2538 nl = [self.sstore.GetMasterNode()]
2541 def CheckPrereq(self):
2542 """Check prerequisites.
2544 This checks that the instance is in the cluster.
2547 instance = self.cfg.GetInstanceInfo(
2548 self.cfg.ExpandInstanceName(self.op.instance_name))
2549 if instance is None:
2550 raise errors.OpPrereqError("Instance '%s' not known" %
2551 self.op.instance_name)
2552 self.instance = instance
2554 def Exec(self, feedback_fn):
2555 """Remove the instance.
2558 instance = self.instance
2559 logger.Info("shutting down instance %s on node %s" %
2560 (instance.name, instance.primary_node))
2562 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2563 if self.op.ignore_failures:
2564 feedback_fn("Warning: can't shutdown instance")
2566 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2567 (instance.name, instance.primary_node))
2569 logger.Info("removing block devices for instance %s" % instance.name)
2571 if not _RemoveDisks(instance, self.cfg):
2572 if self.op.ignore_failures:
2573 feedback_fn("Warning: can't remove instance's disks")
2575 raise errors.OpExecError("Can't remove instance's disks")
2577 logger.Info("removing instance %s out of cluster config" % instance.name)
2579 self.cfg.RemoveInstance(instance.name)
2582 class LUQueryInstances(NoHooksLU):
2583 """Logical unit for querying instances.
2586 _OP_REQP = ["output_fields", "names"]
2588 def CheckPrereq(self):
2589 """Check prerequisites.
2591 This checks that the fields required are valid output fields.
2594 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2595 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2596 "admin_state", "admin_ram",
2597 "disk_template", "ip", "mac", "bridge",
2598 "sda_size", "sdb_size", "vcpus"],
2599 dynamic=self.dynamic_fields,
2600 selected=self.op.output_fields)
2602 self.wanted = _GetWantedInstances(self, self.op.names)
2604 def Exec(self, feedback_fn):
2605 """Computes the list of nodes and their attributes.
2608 instance_names = self.wanted
2609 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2612 # begin data gathering
2614 nodes = frozenset([inst.primary_node for inst in instance_list])
2617 if self.dynamic_fields.intersection(self.op.output_fields):
2619 node_data = rpc.call_all_instances_info(nodes)
2621 result = node_data[name]
2623 live_data.update(result)
2624 elif result == False:
2625 bad_nodes.append(name)
2626 # else no instance is alive
2628 live_data = dict([(name, {}) for name in instance_names])
2630 # end data gathering
2633 for instance in instance_list:
2635 for field in self.op.output_fields:
2640 elif field == "pnode":
2641 val = instance.primary_node
2642 elif field == "snodes":
2643 val = list(instance.secondary_nodes)
2644 elif field == "admin_state":
2645 val = (instance.status != "down")
2646 elif field == "oper_state":
2647 if instance.primary_node in bad_nodes:
2650 val = bool(live_data.get(instance.name))
2651 elif field == "status":
2652 if instance.primary_node in bad_nodes:
2653 val = "ERROR_nodedown"
2655 running = bool(live_data.get(instance.name))
2657 if instance.status != "down":
2662 if instance.status != "down":
2666 elif field == "admin_ram":
2667 val = instance.memory
2668 elif field == "oper_ram":
2669 if instance.primary_node in bad_nodes:
2671 elif instance.name in live_data:
2672 val = live_data[instance.name].get("memory", "?")
2675 elif field == "disk_template":
2676 val = instance.disk_template
2678 val = instance.nics[0].ip
2679 elif field == "bridge":
2680 val = instance.nics[0].bridge
2681 elif field == "mac":
2682 val = instance.nics[0].mac
2683 elif field == "sda_size" or field == "sdb_size":
2684 disk = instance.FindDisk(field[:3])
2689 elif field == "vcpus":
2690 val = instance.vcpus
2692 raise errors.ParameterError(field)
2699 class LUFailoverInstance(LogicalUnit):
2700 """Failover an instance.
2703 HPATH = "instance-failover"
2704 HTYPE = constants.HTYPE_INSTANCE
2705 _OP_REQP = ["instance_name", "ignore_consistency"]
2707 def BuildHooksEnv(self):
2710 This runs on master, primary and secondary nodes of the instance.
2714 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2716 env.update(_BuildInstanceHookEnvByObject(self.instance))
2717 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2720 def CheckPrereq(self):
2721 """Check prerequisites.
2723 This checks that the instance is in the cluster.
2726 instance = self.cfg.GetInstanceInfo(
2727 self.cfg.ExpandInstanceName(self.op.instance_name))
2728 if instance is None:
2729 raise errors.OpPrereqError("Instance '%s' not known" %
2730 self.op.instance_name)
2732 if instance.disk_template not in constants.DTS_NET_MIRROR:
2733 raise errors.OpPrereqError("Instance's disk layout is not"
2734 " network mirrored, cannot failover.")
2736 secondary_nodes = instance.secondary_nodes
2737 if not secondary_nodes:
2738 raise errors.ProgrammerError("no secondary node but using "
2739 "DT_REMOTE_RAID1 template")
2741 target_node = secondary_nodes[0]
2742 # check memory requirements on the secondary node
2743 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2744 instance.name, instance.memory)
2746 # check bridge existance
2747 brlist = [nic.bridge for nic in instance.nics]
2748 if not rpc.call_bridges_exist(target_node, brlist):
2749 raise errors.OpPrereqError("One or more target bridges %s does not"
2750 " exist on destination node '%s'" %
2751 (brlist, target_node))
2753 self.instance = instance
2755 def Exec(self, feedback_fn):
2756 """Failover an instance.
2758 The failover is done by shutting it down on its present node and
2759 starting it on the secondary.
2762 instance = self.instance
2764 source_node = instance.primary_node
2765 target_node = instance.secondary_nodes[0]
2767 feedback_fn("* checking disk consistency between source and target")
2768 for dev in instance.disks:
2769 # for remote_raid1, these are md over drbd
2770 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2771 if instance.status == "up" and not self.op.ignore_consistency:
2772 raise errors.OpExecError("Disk %s is degraded on target node,"
2773 " aborting failover." % dev.iv_name)
2775 feedback_fn("* shutting down instance on source node")
2776 logger.Info("Shutting down instance %s on node %s" %
2777 (instance.name, source_node))
2779 if not rpc.call_instance_shutdown(source_node, instance):
2780 if self.op.ignore_consistency:
2781 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2782 " anyway. Please make sure node %s is down" %
2783 (instance.name, source_node, source_node))
2785 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2786 (instance.name, source_node))
2788 feedback_fn("* deactivating the instance's disks on source node")
2789 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2790 raise errors.OpExecError("Can't shut down the instance's disks.")
2792 instance.primary_node = target_node
2793 # distribute new instance config to the other nodes
2794 self.cfg.AddInstance(instance)
2796 # Only start the instance if it's marked as up
2797 if instance.status == "up":
2798 feedback_fn("* activating the instance's disks on target node")
2799 logger.Info("Starting instance %s on node %s" %
2800 (instance.name, target_node))
2802 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2803 ignore_secondaries=True)
2805 _ShutdownInstanceDisks(instance, self.cfg)
2806 raise errors.OpExecError("Can't activate the instance's disks")
2808 feedback_fn("* starting the instance on the target node")
2809 if not rpc.call_instance_start(target_node, instance, None):
2810 _ShutdownInstanceDisks(instance, self.cfg)
2811 raise errors.OpExecError("Could not start instance %s on node %s." %
2812 (instance.name, target_node))
2815 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2816 """Create a tree of block devices on the primary node.
2818 This always creates all devices.
2822 for child in device.children:
2823 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2826 cfg.SetDiskID(device, node)
2827 new_id = rpc.call_blockdev_create(node, device, device.size,
2828 instance.name, True, info)
2831 if device.physical_id is None:
2832 device.physical_id = new_id
2836 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2837 """Create a tree of block devices on a secondary node.
2839 If this device type has to be created on secondaries, create it and
2842 If not, just recurse to children keeping the same 'force' value.
2845 if device.CreateOnSecondary():
2848 for child in device.children:
2849 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2850 child, force, info):
2855 cfg.SetDiskID(device, node)
2856 new_id = rpc.call_blockdev_create(node, device, device.size,
2857 instance.name, False, info)
2860 if device.physical_id is None:
2861 device.physical_id = new_id
2865 def _GenerateUniqueNames(cfg, exts):
2866 """Generate a suitable LV name.
2868 This will generate a logical volume name for the given instance.
2873 new_id = cfg.GenerateUniqueID()
2874 results.append("%s%s" % (new_id, val))
2878 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2879 """Generate a drbd device complete with its children.
2882 port = cfg.AllocatePort()
2883 vgname = cfg.GetVGName()
2884 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2885 logical_id=(vgname, names[0]))
2886 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2887 logical_id=(vgname, names[1]))
2888 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2889 logical_id = (primary, secondary, port),
2890 children = [dev_data, dev_meta])
2894 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2895 """Generate a drbd8 device complete with its children.
2898 port = cfg.AllocatePort()
2899 vgname = cfg.GetVGName()
2900 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2901 logical_id=(vgname, names[0]))
2902 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2903 logical_id=(vgname, names[1]))
2904 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2905 logical_id = (primary, secondary, port),
2906 children = [dev_data, dev_meta],
2910 def _GenerateDiskTemplate(cfg, template_name,
2911 instance_name, primary_node,
2912 secondary_nodes, disk_sz, swap_sz):
2913 """Generate the entire disk layout for a given template type.
2916 #TODO: compute space requirements
2918 vgname = cfg.GetVGName()
2919 if template_name == constants.DT_DISKLESS:
2921 elif template_name == constants.DT_PLAIN:
2922 if len(secondary_nodes) != 0:
2923 raise errors.ProgrammerError("Wrong template configuration")
2925 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2926 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2927 logical_id=(vgname, names[0]),
2929 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2930 logical_id=(vgname, names[1]),
2932 disks = [sda_dev, sdb_dev]
2933 elif template_name == constants.DT_LOCAL_RAID1:
2934 if len(secondary_nodes) != 0:
2935 raise errors.ProgrammerError("Wrong template configuration")
2938 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2939 ".sdb_m1", ".sdb_m2"])
2940 sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2941 logical_id=(vgname, names[0]))
2942 sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2943 logical_id=(vgname, names[1]))
2944 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2946 children = [sda_dev_m1, sda_dev_m2])
2947 sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2948 logical_id=(vgname, names[2]))
2949 sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2950 logical_id=(vgname, names[3]))
2951 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2953 children = [sdb_dev_m1, sdb_dev_m2])
2954 disks = [md_sda_dev, md_sdb_dev]
2955 elif template_name == constants.DT_REMOTE_RAID1:
2956 if len(secondary_nodes) != 1:
2957 raise errors.ProgrammerError("Wrong template configuration")
2958 remote_node = secondary_nodes[0]
2959 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2960 ".sdb_data", ".sdb_meta"])
2961 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2962 disk_sz, names[0:2])
2963 md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2964 children = [drbd_sda_dev], size=disk_sz)
2965 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2966 swap_sz, names[2:4])
2967 md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2968 children = [drbd_sdb_dev], size=swap_sz)
2969 disks = [md_sda_dev, md_sdb_dev]
2970 elif template_name == constants.DT_DRBD8:
2971 if len(secondary_nodes) != 1:
2972 raise errors.ProgrammerError("Wrong template configuration")
2973 remote_node = secondary_nodes[0]
2974 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2975 ".sdb_data", ".sdb_meta"])
2976 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2977 disk_sz, names[0:2], "sda")
2978 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2979 swap_sz, names[2:4], "sdb")
2980 disks = [drbd_sda_dev, drbd_sdb_dev]
2982 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2986 def _GetInstanceInfoText(instance):
2987 """Compute that text that should be added to the disk's metadata.
2990 return "originstname+%s" % instance.name
2993 def _CreateDisks(cfg, instance):
2994 """Create all disks for an instance.
2996 This abstracts away some work from AddInstance.
2999 instance: the instance object
3002 True or False showing the success of the creation process
3005 info = _GetInstanceInfoText(instance)
3007 for device in instance.disks:
3008 logger.Info("creating volume %s for instance %s" %
3009 (device.iv_name, instance.name))
3011 for secondary_node in instance.secondary_nodes:
3012 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3013 device, False, info):
3014 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3015 (device.iv_name, device, secondary_node))
3018 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3019 instance, device, info):
3020 logger.Error("failed to create volume %s on primary!" %
3026 def _RemoveDisks(instance, cfg):
3027 """Remove all disks for an instance.
3029 This abstracts away some work from `AddInstance()` and
3030 `RemoveInstance()`. Note that in case some of the devices couldn't
3031 be removed, the removal will continue with the other ones (compare
3032 with `_CreateDisks()`).
3035 instance: the instance object
3038 True or False showing the success of the removal proces
3041 logger.Info("removing block devices for instance %s" % instance.name)
3044 for device in instance.disks:
3045 for node, disk in device.ComputeNodeTree(instance.primary_node):
3046 cfg.SetDiskID(disk, node)
3047 if not rpc.call_blockdev_remove(node, disk):
3048 logger.Error("could not remove block device %s on node %s,"
3049 " continuing anyway" %
3050 (device.iv_name, node))
3055 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3056 """Compute disk size requirements in the volume group
3058 This is currently hard-coded for the two-drive layout.
3061 # Required free disk space as a function of disk and swap space
3063 constants.DT_DISKLESS: None,
3064 constants.DT_PLAIN: disk_size + swap_size,
3065 constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3066 # 256 MB are added for drbd metadata, 128MB for each drbd device
3067 constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3068 constants.DT_DRBD8: disk_size + swap_size + 256,
3071 if disk_template not in req_size_dict:
3072 raise errors.ProgrammerError("Disk template '%s' size requirement"
3073 " is unknown" % disk_template)
3075 return req_size_dict[disk_template]
3078 class LUCreateInstance(LogicalUnit):
3079 """Create an instance.
3082 HPATH = "instance-add"
3083 HTYPE = constants.HTYPE_INSTANCE
3084 _OP_REQP = ["instance_name", "mem_size", "disk_size",
3085 "disk_template", "swap_size", "mode", "start", "vcpus",
3086 "wait_for_sync", "ip_check", "mac"]
3088 def _RunAllocator(self):
3089 """Run the allocator based on input opcode.
3092 disks = [{"size": self.op.disk_size, "mode": "w"},
3093 {"size": self.op.swap_size, "mode": "w"}]
3094 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3095 "bridge": self.op.bridge}]
3096 ial = IAllocator(self.cfg, self.sstore,
3097 mode=constants.IALLOCATOR_MODE_ALLOC,
3098 name=self.op.instance_name,
3099 disk_template=self.op.disk_template,
3102 vcpus=self.op.vcpus,
3103 mem_size=self.op.mem_size,
3108 ial.Run(self.op.iallocator)
3111 raise errors.OpPrereqError("Can't compute nodes using"
3112 " iallocator '%s': %s" % (self.op.iallocator,
3114 if len(ial.nodes) != ial.required_nodes:
3115 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3116 " of nodes (%s), required %s" %
3117 (len(ial.nodes), ial.required_nodes))
3118 self.op.pnode = ial.nodes[0]
3119 logger.ToStdout("Selected nodes for the instance: %s" %
3120 (", ".join(ial.nodes),))
3121 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3122 (self.op.instance_name, self.op.iallocator, ial.nodes))
3123 if ial.required_nodes == 2:
3124 self.op.snode = ial.nodes[1]
3126 def BuildHooksEnv(self):
3129 This runs on master, primary and secondary nodes of the instance.
3133 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3134 "INSTANCE_DISK_SIZE": self.op.disk_size,
3135 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3136 "INSTANCE_ADD_MODE": self.op.mode,
3138 if self.op.mode == constants.INSTANCE_IMPORT:
3139 env["INSTANCE_SRC_NODE"] = self.op.src_node
3140 env["INSTANCE_SRC_PATH"] = self.op.src_path
3141 env["INSTANCE_SRC_IMAGE"] = self.src_image
3143 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3144 primary_node=self.op.pnode,
3145 secondary_nodes=self.secondaries,
3146 status=self.instance_status,
3147 os_type=self.op.os_type,
3148 memory=self.op.mem_size,
3149 vcpus=self.op.vcpus,
3150 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3153 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3158 def CheckPrereq(self):
3159 """Check prerequisites.
3162 # set optional parameters to none if they don't exist
3163 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3164 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3165 "vnc_bind_address"]:
3166 if not hasattr(self.op, attr):
3167 setattr(self.op, attr, None)
3169 if self.op.mode not in (constants.INSTANCE_CREATE,
3170 constants.INSTANCE_IMPORT):
3171 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3174 if self.op.mode == constants.INSTANCE_IMPORT:
3175 src_node = getattr(self.op, "src_node", None)
3176 src_path = getattr(self.op, "src_path", None)
3177 if src_node is None or src_path is None:
3178 raise errors.OpPrereqError("Importing an instance requires source"
3179 " node and path options")
3180 src_node_full = self.cfg.ExpandNodeName(src_node)
3181 if src_node_full is None:
3182 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3183 self.op.src_node = src_node = src_node_full
3185 if not os.path.isabs(src_path):
3186 raise errors.OpPrereqError("The source path must be absolute")
3188 export_info = rpc.call_export_info(src_node, src_path)
3191 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3193 if not export_info.has_section(constants.INISECT_EXP):
3194 raise errors.ProgrammerError("Corrupted export config")
3196 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3197 if (int(ei_version) != constants.EXPORT_VERSION):
3198 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3199 (ei_version, constants.EXPORT_VERSION))
3201 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3202 raise errors.OpPrereqError("Can't import instance with more than"
3205 # FIXME: are the old os-es, disk sizes, etc. useful?
3206 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3207 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3209 self.src_image = diskimage
3210 else: # INSTANCE_CREATE
3211 if getattr(self.op, "os_type", None) is None:
3212 raise errors.OpPrereqError("No guest OS specified")
3214 #### instance parameters check
3216 # disk template and mirror node verification
3217 if self.op.disk_template not in constants.DISK_TEMPLATES:
3218 raise errors.OpPrereqError("Invalid disk template name")
3220 # instance name verification
3221 hostname1 = utils.HostInfo(self.op.instance_name)
3223 self.op.instance_name = instance_name = hostname1.name
3224 instance_list = self.cfg.GetInstanceList()
3225 if instance_name in instance_list:
3226 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3229 # ip validity checks
3230 ip = getattr(self.op, "ip", None)
3231 if ip is None or ip.lower() == "none":
3233 elif ip.lower() == "auto":
3234 inst_ip = hostname1.ip
3236 if not utils.IsValidIP(ip):
3237 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3238 " like a valid IP" % ip)
3240 self.inst_ip = self.op.ip = inst_ip
3242 if self.op.start and not self.op.ip_check:
3243 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3244 " adding an instance in start mode")
3246 if self.op.ip_check:
3247 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3248 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3249 (hostname1.ip, instance_name))
3251 # MAC address verification
3252 if self.op.mac != "auto":
3253 if not utils.IsValidMac(self.op.mac.lower()):
3254 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3257 # bridge verification
3258 bridge = getattr(self.op, "bridge", None)
3260 self.op.bridge = self.cfg.GetDefBridge()
3262 self.op.bridge = bridge
3264 # boot order verification
3265 if self.op.hvm_boot_order is not None:
3266 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3267 raise errors.OpPrereqError("invalid boot order specified,"
3268 " must be one or more of [acdn]")
3271 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3272 raise errors.OpPrereqError("One and only one of iallocator and primary"
3273 " node must be given")
3275 if self.op.iallocator is not None:
3276 self._RunAllocator()
3278 #### node related checks
3280 # check primary node
3281 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3283 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3285 self.op.pnode = pnode.name
3287 self.secondaries = []
3289 # mirror node verification
3290 if self.op.disk_template in constants.DTS_NET_MIRROR:
3291 if getattr(self.op, "snode", None) is None:
3292 raise errors.OpPrereqError("The networked disk templates need"
3295 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3296 if snode_name is None:
3297 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3299 elif snode_name == pnode.name:
3300 raise errors.OpPrereqError("The secondary node cannot be"
3301 " the primary node.")
3302 self.secondaries.append(snode_name)
3304 req_size = _ComputeDiskSize(self.op.disk_template,
3305 self.op.disk_size, self.op.swap_size)
3307 # Check lv size requirements
3308 if req_size is not None:
3309 nodenames = [pnode.name] + self.secondaries
3310 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3311 for node in nodenames:
3312 info = nodeinfo.get(node, None)
3314 raise errors.OpPrereqError("Cannot get current information"
3315 " from node '%s'" % nodeinfo)
3316 vg_free = info.get('vg_free', None)
3317 if not isinstance(vg_free, int):
3318 raise errors.OpPrereqError("Can't compute free disk space on"
3320 if req_size > info['vg_free']:
3321 raise errors.OpPrereqError("Not enough disk space on target node %s."
3322 " %d MB available, %d MB required" %
3323 (node, info['vg_free'], req_size))
3326 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3328 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3329 " primary node" % self.op.os_type)
3331 if self.op.kernel_path == constants.VALUE_NONE:
3332 raise errors.OpPrereqError("Can't set instance kernel to none")
3335 # bridge check on primary node
3336 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3337 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3338 " destination node '%s'" %
3339 (self.op.bridge, pnode.name))
3341 # hvm_cdrom_image_path verification
3342 if self.op.hvm_cdrom_image_path is not None:
3343 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3344 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3345 " be an absolute path or None, not %s" %
3346 self.op.hvm_cdrom_image_path)
3347 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3348 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3349 " regular file or a symlink pointing to"
3350 " an existing regular file, not %s" %
3351 self.op.hvm_cdrom_image_path)
3353 # vnc_bind_address verification
3354 if self.op.vnc_bind_address is not None:
3355 if not utils.IsValidIP(self.op.vnc_bind_address):
3356 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3357 " like a valid IP address" %
3358 self.op.vnc_bind_address)
3361 self.instance_status = 'up'
3363 self.instance_status = 'down'
3365 def Exec(self, feedback_fn):
3366 """Create and add the instance to the cluster.
3369 instance = self.op.instance_name
3370 pnode_name = self.pnode.name
3372 if self.op.mac == "auto":
3373 mac_address = self.cfg.GenerateMAC()
3375 mac_address = self.op.mac
3377 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3378 if self.inst_ip is not None:
3379 nic.ip = self.inst_ip
3381 ht_kind = self.sstore.GetHypervisorType()
3382 if ht_kind in constants.HTS_REQ_PORT:
3383 network_port = self.cfg.AllocatePort()
3387 if self.op.vnc_bind_address is None:
3388 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3390 disks = _GenerateDiskTemplate(self.cfg,
3391 self.op.disk_template,
3392 instance, pnode_name,
3393 self.secondaries, self.op.disk_size,
3396 iobj = objects.Instance(name=instance, os=self.op.os_type,
3397 primary_node=pnode_name,
3398 memory=self.op.mem_size,
3399 vcpus=self.op.vcpus,
3400 nics=[nic], disks=disks,
3401 disk_template=self.op.disk_template,
3402 status=self.instance_status,
3403 network_port=network_port,
3404 kernel_path=self.op.kernel_path,
3405 initrd_path=self.op.initrd_path,
3406 hvm_boot_order=self.op.hvm_boot_order,
3407 hvm_acpi=self.op.hvm_acpi,
3408 hvm_pae=self.op.hvm_pae,
3409 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3410 vnc_bind_address=self.op.vnc_bind_address,
3413 feedback_fn("* creating instance disks...")
3414 if not _CreateDisks(self.cfg, iobj):
3415 _RemoveDisks(iobj, self.cfg)
3416 raise errors.OpExecError("Device creation failed, reverting...")
3418 feedback_fn("adding instance %s to cluster config" % instance)
3420 self.cfg.AddInstance(iobj)
3422 if self.op.wait_for_sync:
3423 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3424 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3425 # make sure the disks are not degraded (still sync-ing is ok)
3427 feedback_fn("* checking mirrors status")
3428 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3433 _RemoveDisks(iobj, self.cfg)
3434 self.cfg.RemoveInstance(iobj.name)
3435 raise errors.OpExecError("There are some degraded disks for"
3438 feedback_fn("creating os for instance %s on node %s" %
3439 (instance, pnode_name))
3441 if iobj.disk_template != constants.DT_DISKLESS:
3442 if self.op.mode == constants.INSTANCE_CREATE:
3443 feedback_fn("* running the instance OS create scripts...")
3444 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3445 raise errors.OpExecError("could not add os for instance %s"
3447 (instance, pnode_name))
3449 elif self.op.mode == constants.INSTANCE_IMPORT:
3450 feedback_fn("* running the instance OS import scripts...")
3451 src_node = self.op.src_node
3452 src_image = self.src_image
3453 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3454 src_node, src_image):
3455 raise errors.OpExecError("Could not import os for instance"
3457 (instance, pnode_name))
3459 # also checked in the prereq part
3460 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3464 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3465 feedback_fn("* starting instance...")
3466 if not rpc.call_instance_start(pnode_name, iobj, None):
3467 raise errors.OpExecError("Could not start instance")
3470 class LUConnectConsole(NoHooksLU):
3471 """Connect to an instance's console.
3473 This is somewhat special in that it returns the command line that
3474 you need to run on the master node in order to connect to the
3478 _OP_REQP = ["instance_name"]
3480 def CheckPrereq(self):
3481 """Check prerequisites.
3483 This checks that the instance is in the cluster.
3486 instance = self.cfg.GetInstanceInfo(
3487 self.cfg.ExpandInstanceName(self.op.instance_name))
3488 if instance is None:
3489 raise errors.OpPrereqError("Instance '%s' not known" %
3490 self.op.instance_name)
3491 self.instance = instance
3493 def Exec(self, feedback_fn):
3494 """Connect to the console of an instance
3497 instance = self.instance
3498 node = instance.primary_node
3500 node_insts = rpc.call_instance_list([node])[node]
3501 if node_insts is False:
3502 raise errors.OpExecError("Can't connect to node %s." % node)
3504 if instance.name not in node_insts:
3505 raise errors.OpExecError("Instance %s is not running." % instance.name)
3507 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3509 hyper = hypervisor.GetHypervisor()
3510 console_cmd = hyper.GetShellCommandForConsole(instance)
3512 argv = ["ssh", "-q", "-t"]
3513 argv.extend(ssh.KNOWN_HOSTS_OPTS)
3514 argv.extend(ssh.BATCH_MODE_OPTS)
3516 argv.append(console_cmd)
3520 class LUAddMDDRBDComponent(LogicalUnit):
3521 """Adda new mirror member to an instance's disk.
3524 HPATH = "mirror-add"
3525 HTYPE = constants.HTYPE_INSTANCE
3526 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3528 def BuildHooksEnv(self):
3531 This runs on the master, the primary and all the secondaries.
3535 "NEW_SECONDARY": self.op.remote_node,
3536 "DISK_NAME": self.op.disk_name,
3538 env.update(_BuildInstanceHookEnvByObject(self.instance))
3539 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3540 self.op.remote_node,] + list(self.instance.secondary_nodes)
3543 def CheckPrereq(self):
3544 """Check prerequisites.
3546 This checks that the instance is in the cluster.
3549 instance = self.cfg.GetInstanceInfo(
3550 self.cfg.ExpandInstanceName(self.op.instance_name))
3551 if instance is None:
3552 raise errors.OpPrereqError("Instance '%s' not known" %
3553 self.op.instance_name)
3554 self.instance = instance
3556 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3557 if remote_node is None:
3558 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3559 self.remote_node = remote_node
3561 if remote_node == instance.primary_node:
3562 raise errors.OpPrereqError("The specified node is the primary node of"
3565 if instance.disk_template != constants.DT_REMOTE_RAID1:
3566 raise errors.OpPrereqError("Instance's disk layout is not"
3568 for disk in instance.disks:
3569 if disk.iv_name == self.op.disk_name:
3572 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3573 " instance." % self.op.disk_name)
3574 if len(disk.children) > 1:
3575 raise errors.OpPrereqError("The device already has two slave devices."
3576 " This would create a 3-disk raid1 which we"
3580 def Exec(self, feedback_fn):
3581 """Add the mirror component
3585 instance = self.instance
3587 remote_node = self.remote_node
3588 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3589 names = _GenerateUniqueNames(self.cfg, lv_names)
3590 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3591 remote_node, disk.size, names)
3593 logger.Info("adding new mirror component on secondary")
3595 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3597 _GetInstanceInfoText(instance)):
3598 raise errors.OpExecError("Failed to create new component on secondary"
3599 " node %s" % remote_node)
3601 logger.Info("adding new mirror component on primary")
3603 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3605 _GetInstanceInfoText(instance)):
3606 # remove secondary dev
3607 self.cfg.SetDiskID(new_drbd, remote_node)
3608 rpc.call_blockdev_remove(remote_node, new_drbd)
3609 raise errors.OpExecError("Failed to create volume on primary")
3611 # the device exists now
3612 # call the primary node to add the mirror to md
3613 logger.Info("adding new mirror component to md")
3614 if not rpc.call_blockdev_addchildren(instance.primary_node,
3616 logger.Error("Can't add mirror compoment to md!")
3617 self.cfg.SetDiskID(new_drbd, remote_node)
3618 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3619 logger.Error("Can't rollback on secondary")
3620 self.cfg.SetDiskID(new_drbd, instance.primary_node)
3621 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3622 logger.Error("Can't rollback on primary")
3623 raise errors.OpExecError("Can't add mirror component to md array")
3625 disk.children.append(new_drbd)
3627 self.cfg.AddInstance(instance)
3629 _WaitForSync(self.cfg, instance, self.proc)
3634 class LURemoveMDDRBDComponent(LogicalUnit):
3635 """Remove a component from a remote_raid1 disk.
3638 HPATH = "mirror-remove"
3639 HTYPE = constants.HTYPE_INSTANCE
3640 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3642 def BuildHooksEnv(self):
3645 This runs on the master, the primary and all the secondaries.
3649 "DISK_NAME": self.op.disk_name,
3650 "DISK_ID": self.op.disk_id,
3651 "OLD_SECONDARY": self.old_secondary,
3653 env.update(_BuildInstanceHookEnvByObject(self.instance))
3654 nl = [self.sstore.GetMasterNode(),
3655 self.instance.primary_node] + list(self.instance.secondary_nodes)
3658 def CheckPrereq(self):
3659 """Check prerequisites.
3661 This checks that the instance is in the cluster.
3664 instance = self.cfg.GetInstanceInfo(
3665 self.cfg.ExpandInstanceName(self.op.instance_name))
3666 if instance is None:
3667 raise errors.OpPrereqError("Instance '%s' not known" %
3668 self.op.instance_name)
3669 self.instance = instance
3671 if instance.disk_template != constants.DT_REMOTE_RAID1:
3672 raise errors.OpPrereqError("Instance's disk layout is not"
3674 for disk in instance.disks:
3675 if disk.iv_name == self.op.disk_name:
3678 raise errors.OpPrereqError("Can't find this device ('%s') in the"
3679 " instance." % self.op.disk_name)
3680 for child in disk.children:
3681 if (child.dev_type == constants.LD_DRBD7 and
3682 child.logical_id[2] == self.op.disk_id):
3685 raise errors.OpPrereqError("Can't find the device with this port.")
3687 if len(disk.children) < 2:
3688 raise errors.OpPrereqError("Cannot remove the last component from"
3692 if self.child.logical_id[0] == instance.primary_node:
3696 self.old_secondary = self.child.logical_id[oid]
3698 def Exec(self, feedback_fn):
3699 """Remove the mirror component
3702 instance = self.instance
3705 logger.Info("remove mirror component")
3706 self.cfg.SetDiskID(disk, instance.primary_node)
3707 if not rpc.call_blockdev_removechildren(instance.primary_node,
3709 raise errors.OpExecError("Can't remove child from mirror.")
3711 for node in child.logical_id[:2]:
3712 self.cfg.SetDiskID(child, node)
3713 if not rpc.call_blockdev_remove(node, child):
3714 logger.Error("Warning: failed to remove device from node %s,"
3715 " continuing operation." % node)
3717 disk.children.remove(child)
3718 self.cfg.AddInstance(instance)
3721 class LUReplaceDisks(LogicalUnit):
3722 """Replace the disks of an instance.
3725 HPATH = "mirrors-replace"
3726 HTYPE = constants.HTYPE_INSTANCE
3727 _OP_REQP = ["instance_name", "mode", "disks"]
3729 def _RunAllocator(self):
3730 """Compute a new secondary node using an IAllocator.
3733 ial = IAllocator(self.cfg, self.sstore,
3734 mode=constants.IALLOCATOR_MODE_RELOC,
3735 name=self.op.instance_name,
3736 relocate_from=[self.sec_node])
3738 ial.Run(self.op.iallocator)
3741 raise errors.OpPrereqError("Can't compute nodes using"
3742 " iallocator '%s': %s" % (self.op.iallocator,
3744 if len(ial.nodes) != ial.required_nodes:
3745 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3746 " of nodes (%s), required %s" %
3747 (len(ial.nodes), ial.required_nodes))
3748 self.op.remote_node = ial.nodes[0]
3749 logger.ToStdout("Selected new secondary for the instance: %s" %
3750 self.op.remote_node)
3752 def BuildHooksEnv(self):
3755 This runs on the master, the primary and all the secondaries.
3759 "MODE": self.op.mode,
3760 "NEW_SECONDARY": self.op.remote_node,
3761 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3763 env.update(_BuildInstanceHookEnvByObject(self.instance))
3765 self.sstore.GetMasterNode(),
3766 self.instance.primary_node,
3768 if self.op.remote_node is not None:
3769 nl.append(self.op.remote_node)
3772 def CheckPrereq(self):
3773 """Check prerequisites.
3775 This checks that the instance is in the cluster.
3778 if not hasattr(self.op, "remote_node"):
3779 self.op.remote_node = None
3781 instance = self.cfg.GetInstanceInfo(
3782 self.cfg.ExpandInstanceName(self.op.instance_name))
3783 if instance is None:
3784 raise errors.OpPrereqError("Instance '%s' not known" %
3785 self.op.instance_name)
3786 self.instance = instance
3787 self.op.instance_name = instance.name
3789 if instance.disk_template not in constants.DTS_NET_MIRROR:
3790 raise errors.OpPrereqError("Instance's disk layout is not"
3791 " network mirrored.")
3793 if len(instance.secondary_nodes) != 1:
3794 raise errors.OpPrereqError("The instance has a strange layout,"
3795 " expected one secondary but found %d" %
3796 len(instance.secondary_nodes))
3798 self.sec_node = instance.secondary_nodes[0]
3800 ia_name = getattr(self.op, "iallocator", None)
3801 if ia_name is not None:
3802 if self.op.remote_node is not None:
3803 raise errors.OpPrereqError("Give either the iallocator or the new"
3804 " secondary, not both")
3805 self.op.remote_node = self._RunAllocator()
3807 remote_node = self.op.remote_node
3808 if remote_node is not None:
3809 remote_node = self.cfg.ExpandNodeName(remote_node)
3810 if remote_node is None:
3811 raise errors.OpPrereqError("Node '%s' not known" %
3812 self.op.remote_node)
3813 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3815 self.remote_node_info = None
3816 if remote_node == instance.primary_node:
3817 raise errors.OpPrereqError("The specified node is the primary node of"
3819 elif remote_node == self.sec_node:
3820 if self.op.mode == constants.REPLACE_DISK_SEC:
3821 # this is for DRBD8, where we can't execute the same mode of
3822 # replacement as for drbd7 (no different port allocated)
3823 raise errors.OpPrereqError("Same secondary given, cannot execute"
3825 # the user gave the current secondary, switch to
3826 # 'no-replace-secondary' mode for drbd7
3828 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3829 self.op.mode != constants.REPLACE_DISK_ALL):
3830 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3831 " disks replacement, not individual ones")
3832 if instance.disk_template == constants.DT_DRBD8:
3833 if (self.op.mode == constants.REPLACE_DISK_ALL and
3834 remote_node is not None):
3835 # switch to replace secondary mode
3836 self.op.mode = constants.REPLACE_DISK_SEC
3838 if self.op.mode == constants.REPLACE_DISK_ALL:
3839 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3840 " secondary disk replacement, not"
3842 elif self.op.mode == constants.REPLACE_DISK_PRI:
3843 if remote_node is not None:
3844 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3845 " the secondary while doing a primary"
3846 " node disk replacement")
3847 self.tgt_node = instance.primary_node
3848 self.oth_node = instance.secondary_nodes[0]
3849 elif self.op.mode == constants.REPLACE_DISK_SEC:
3850 self.new_node = remote_node # this can be None, in which case
3851 # we don't change the secondary
3852 self.tgt_node = instance.secondary_nodes[0]
3853 self.oth_node = instance.primary_node
3855 raise errors.ProgrammerError("Unhandled disk replace mode")
3857 for name in self.op.disks:
3858 if instance.FindDisk(name) is None:
3859 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3860 (name, instance.name))
3861 self.op.remote_node = remote_node
3863 def _ExecRR1(self, feedback_fn):
3864 """Replace the disks of an instance.
3867 instance = self.instance
3870 if self.op.remote_node is None:
3871 remote_node = self.sec_node
3873 remote_node = self.op.remote_node
3875 for dev in instance.disks:
3877 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3878 names = _GenerateUniqueNames(cfg, lv_names)
3879 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3880 remote_node, size, names)
3881 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3882 logger.Info("adding new mirror component on secondary for %s" %
3885 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3887 _GetInstanceInfoText(instance)):
3888 raise errors.OpExecError("Failed to create new component on secondary"
3889 " node %s. Full abort, cleanup manually!" %
3892 logger.Info("adding new mirror component on primary")
3894 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3896 _GetInstanceInfoText(instance)):
3897 # remove secondary dev
3898 cfg.SetDiskID(new_drbd, remote_node)
3899 rpc.call_blockdev_remove(remote_node, new_drbd)
3900 raise errors.OpExecError("Failed to create volume on primary!"
3901 " Full abort, cleanup manually!!")
3903 # the device exists now
3904 # call the primary node to add the mirror to md
3905 logger.Info("adding new mirror component to md")
3906 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3908 logger.Error("Can't add mirror compoment to md!")
3909 cfg.SetDiskID(new_drbd, remote_node)
3910 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3911 logger.Error("Can't rollback on secondary")
3912 cfg.SetDiskID(new_drbd, instance.primary_node)
3913 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3914 logger.Error("Can't rollback on primary")
3915 raise errors.OpExecError("Full abort, cleanup manually!!")
3917 dev.children.append(new_drbd)
3918 cfg.AddInstance(instance)
3920 # this can fail as the old devices are degraded and _WaitForSync
3921 # does a combined result over all disks, so we don't check its
3923 _WaitForSync(cfg, instance, self.proc, unlock=True)
3925 # so check manually all the devices
3926 for name in iv_names:
3927 dev, child, new_drbd = iv_names[name]
3928 cfg.SetDiskID(dev, instance.primary_node)
3929 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3931 raise errors.OpExecError("MD device %s is degraded!" % name)
3932 cfg.SetDiskID(new_drbd, instance.primary_node)
3933 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3935 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3937 for name in iv_names:
3938 dev, child, new_drbd = iv_names[name]
3939 logger.Info("remove mirror %s component" % name)
3940 cfg.SetDiskID(dev, instance.primary_node)
3941 if not rpc.call_blockdev_removechildren(instance.primary_node,
3943 logger.Error("Can't remove child from mirror, aborting"
3944 " *this device cleanup*.\nYou need to cleanup manually!!")
3947 for node in child.logical_id[:2]:
3948 logger.Info("remove child device on %s" % node)
3949 cfg.SetDiskID(child, node)
3950 if not rpc.call_blockdev_remove(node, child):
3951 logger.Error("Warning: failed to remove device from node %s,"
3952 " continuing operation." % node)
3954 dev.children.remove(child)
3956 cfg.AddInstance(instance)
3958 def _ExecD8DiskOnly(self, feedback_fn):
3959 """Replace a disk on the primary or secondary for dbrd8.
3961 The algorithm for replace is quite complicated:
3962 - for each disk to be replaced:
3963 - create new LVs on the target node with unique names
3964 - detach old LVs from the drbd device
3965 - rename old LVs to name_replaced.<time_t>
3966 - rename new LVs to old LVs
3967 - attach the new LVs (with the old names now) to the drbd device
3968 - wait for sync across all devices
3969 - for each modified disk:
3970 - remove old LVs (which have the name name_replaces.<time_t>)
3972 Failures are not very well handled.
3976 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3977 instance = self.instance
3979 vgname = self.cfg.GetVGName()
3982 tgt_node = self.tgt_node
3983 oth_node = self.oth_node
3985 # Step: check device activation
3986 self.proc.LogStep(1, steps_total, "check device existence")
3987 info("checking volume groups")
3988 my_vg = cfg.GetVGName()
3989 results = rpc.call_vg_list([oth_node, tgt_node])
3991 raise errors.OpExecError("Can't list volume groups on the nodes")
3992 for node in oth_node, tgt_node:
3993 res = results.get(node, False)
3994 if not res or my_vg not in res:
3995 raise errors.OpExecError("Volume group '%s' not found on %s" %
3997 for dev in instance.disks:
3998 if not dev.iv_name in self.op.disks:
4000 for node in tgt_node, oth_node:
4001 info("checking %s on %s" % (dev.iv_name, node))
4002 cfg.SetDiskID(dev, node)
4003 if not rpc.call_blockdev_find(node, dev):
4004 raise errors.OpExecError("Can't find device %s on node %s" %
4005 (dev.iv_name, node))
4007 # Step: check other node consistency
4008 self.proc.LogStep(2, steps_total, "check peer consistency")
4009 for dev in instance.disks:
4010 if not dev.iv_name in self.op.disks:
4012 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
4013 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
4014 oth_node==instance.primary_node):
4015 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4016 " to replace disks on this node (%s)" %
4017 (oth_node, tgt_node))
4019 # Step: create new storage
4020 self.proc.LogStep(3, steps_total, "allocate new storage")
4021 for dev in instance.disks:
4022 if not dev.iv_name in self.op.disks:
4025 cfg.SetDiskID(dev, tgt_node)
4026 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
4027 names = _GenerateUniqueNames(cfg, lv_names)
4028 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4029 logical_id=(vgname, names[0]))
4030 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4031 logical_id=(vgname, names[1]))
4032 new_lvs = [lv_data, lv_meta]
4033 old_lvs = dev.children
4034 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4035 info("creating new local storage on %s for %s" %
4036 (tgt_node, dev.iv_name))
4037 # since we *always* want to create this LV, we use the
4038 # _Create...OnPrimary (which forces the creation), even if we
4039 # are talking about the secondary node
4040 for new_lv in new_lvs:
4041 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
4042 _GetInstanceInfoText(instance)):
4043 raise errors.OpExecError("Failed to create new LV named '%s' on"
4045 (new_lv.logical_id[1], tgt_node))
4047 # Step: for each lv, detach+rename*2+attach
4048 self.proc.LogStep(4, steps_total, "change drbd configuration")
4049 for dev, old_lvs, new_lvs in iv_names.itervalues():
4050 info("detaching %s drbd from local storage" % dev.iv_name)
4051 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4052 raise errors.OpExecError("Can't detach drbd from local storage on node"
4053 " %s for device %s" % (tgt_node, dev.iv_name))
4055 #cfg.Update(instance)
4057 # ok, we created the new LVs, so now we know we have the needed
4058 # storage; as such, we proceed on the target node to rename
4059 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4060 # using the assumption that logical_id == physical_id (which in
4061 # turn is the unique_id on that node)
4063 # FIXME(iustin): use a better name for the replaced LVs
4064 temp_suffix = int(time.time())
4065 ren_fn = lambda d, suff: (d.physical_id[0],
4066 d.physical_id[1] + "_replaced-%s" % suff)
4067 # build the rename list based on what LVs exist on the node
4069 for to_ren in old_lvs:
4070 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
4071 if find_res is not None: # device exists
4072 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4074 info("renaming the old LVs on the target node")
4075 if not rpc.call_blockdev_rename(tgt_node, rlist):
4076 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4077 # now we rename the new LVs to the old LVs
4078 info("renaming the new LVs on the target node")
4079 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4080 if not rpc.call_blockdev_rename(tgt_node, rlist):
4081 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4083 for old, new in zip(old_lvs, new_lvs):
4084 new.logical_id = old.logical_id
4085 cfg.SetDiskID(new, tgt_node)
4087 for disk in old_lvs:
4088 disk.logical_id = ren_fn(disk, temp_suffix)
4089 cfg.SetDiskID(disk, tgt_node)
4091 # now that the new lvs have the old name, we can add them to the device
4092 info("adding new mirror component on %s" % tgt_node)
4093 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4094 for new_lv in new_lvs:
4095 if not rpc.call_blockdev_remove(tgt_node, new_lv):
4096 warning("Can't rollback device %s", hint="manually cleanup unused"
4098 raise errors.OpExecError("Can't add local storage to drbd")
4100 dev.children = new_lvs
4101 cfg.Update(instance)
4103 # Step: wait for sync
4105 # this can fail as the old devices are degraded and _WaitForSync
4106 # does a combined result over all disks, so we don't check its
4108 self.proc.LogStep(5, steps_total, "sync devices")
4109 _WaitForSync(cfg, instance, self.proc, unlock=True)
4111 # so check manually all the devices
4112 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4113 cfg.SetDiskID(dev, instance.primary_node)
4114 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4116 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4118 # Step: remove old storage
4119 self.proc.LogStep(6, steps_total, "removing old storage")
4120 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4121 info("remove logical volumes for %s" % name)
4123 cfg.SetDiskID(lv, tgt_node)
4124 if not rpc.call_blockdev_remove(tgt_node, lv):
4125 warning("Can't remove old LV", hint="manually remove unused LVs")
4128 def _ExecD8Secondary(self, feedback_fn):
4129 """Replace the secondary node for drbd8.
4131 The algorithm for replace is quite complicated:
4132 - for all disks of the instance:
4133 - create new LVs on the new node with same names
4134 - shutdown the drbd device on the old secondary
4135 - disconnect the drbd network on the primary
4136 - create the drbd device on the new secondary
4137 - network attach the drbd on the primary, using an artifice:
4138 the drbd code for Attach() will connect to the network if it
4139 finds a device which is connected to the good local disks but
4141 - wait for sync across all devices
4142 - remove all disks from the old secondary
4144 Failures are not very well handled.
4148 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4149 instance = self.instance
4151 vgname = self.cfg.GetVGName()
4154 old_node = self.tgt_node
4155 new_node = self.new_node
4156 pri_node = instance.primary_node
4158 # Step: check device activation
4159 self.proc.LogStep(1, steps_total, "check device existence")
4160 info("checking volume groups")
4161 my_vg = cfg.GetVGName()
4162 results = rpc.call_vg_list([pri_node, new_node])
4164 raise errors.OpExecError("Can't list volume groups on the nodes")
4165 for node in pri_node, new_node:
4166 res = results.get(node, False)
4167 if not res or my_vg not in res:
4168 raise errors.OpExecError("Volume group '%s' not found on %s" %
4170 for dev in instance.disks:
4171 if not dev.iv_name in self.op.disks:
4173 info("checking %s on %s" % (dev.iv_name, pri_node))
4174 cfg.SetDiskID(dev, pri_node)
4175 if not rpc.call_blockdev_find(pri_node, dev):
4176 raise errors.OpExecError("Can't find device %s on node %s" %
4177 (dev.iv_name, pri_node))
4179 # Step: check other node consistency
4180 self.proc.LogStep(2, steps_total, "check peer consistency")
4181 for dev in instance.disks:
4182 if not dev.iv_name in self.op.disks:
4184 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4185 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4186 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4187 " unsafe to replace the secondary" %
4190 # Step: create new storage
4191 self.proc.LogStep(3, steps_total, "allocate new storage")
4192 for dev in instance.disks:
4194 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4195 # since we *always* want to create this LV, we use the
4196 # _Create...OnPrimary (which forces the creation), even if we
4197 # are talking about the secondary node
4198 for new_lv in dev.children:
4199 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4200 _GetInstanceInfoText(instance)):
4201 raise errors.OpExecError("Failed to create new LV named '%s' on"
4203 (new_lv.logical_id[1], new_node))
4205 iv_names[dev.iv_name] = (dev, dev.children)
4207 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4208 for dev in instance.disks:
4210 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4211 # create new devices on new_node
4212 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4213 logical_id=(pri_node, new_node,
4215 children=dev.children)
4216 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4218 _GetInstanceInfoText(instance)):
4219 raise errors.OpExecError("Failed to create new DRBD on"
4220 " node '%s'" % new_node)
4222 for dev in instance.disks:
4223 # we have new devices, shutdown the drbd on the old secondary
4224 info("shutting down drbd for %s on old node" % dev.iv_name)
4225 cfg.SetDiskID(dev, old_node)
4226 if not rpc.call_blockdev_shutdown(old_node, dev):
4227 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4228 hint="Please cleanup this device manually as soon as possible")
4230 info("detaching primary drbds from the network (=> standalone)")
4232 for dev in instance.disks:
4233 cfg.SetDiskID(dev, pri_node)
4234 # set the physical (unique in bdev terms) id to None, meaning
4235 # detach from network
4236 dev.physical_id = (None,) * len(dev.physical_id)
4237 # and 'find' the device, which will 'fix' it to match the
4239 if rpc.call_blockdev_find(pri_node, dev):
4242 warning("Failed to detach drbd %s from network, unusual case" %
4246 # no detaches succeeded (very unlikely)
4247 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4249 # if we managed to detach at least one, we update all the disks of
4250 # the instance to point to the new secondary
4251 info("updating instance configuration")
4252 for dev in instance.disks:
4253 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4254 cfg.SetDiskID(dev, pri_node)
4255 cfg.Update(instance)
4257 # and now perform the drbd attach
4258 info("attaching primary drbds to new secondary (standalone => connected)")
4260 for dev in instance.disks:
4261 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4262 # since the attach is smart, it's enough to 'find' the device,
4263 # it will automatically activate the network, if the physical_id
4265 cfg.SetDiskID(dev, pri_node)
4266 if not rpc.call_blockdev_find(pri_node, dev):
4267 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4268 "please do a gnt-instance info to see the status of disks")
4270 # this can fail as the old devices are degraded and _WaitForSync
4271 # does a combined result over all disks, so we don't check its
4273 self.proc.LogStep(5, steps_total, "sync devices")
4274 _WaitForSync(cfg, instance, self.proc, unlock=True)
4276 # so check manually all the devices
4277 for name, (dev, old_lvs) in iv_names.iteritems():
4278 cfg.SetDiskID(dev, pri_node)
4279 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4281 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4283 self.proc.LogStep(6, steps_total, "removing old storage")
4284 for name, (dev, old_lvs) in iv_names.iteritems():
4285 info("remove logical volumes for %s" % name)
4287 cfg.SetDiskID(lv, old_node)
4288 if not rpc.call_blockdev_remove(old_node, lv):
4289 warning("Can't remove LV on old secondary",
4290 hint="Cleanup stale volumes by hand")
4292 def Exec(self, feedback_fn):
4293 """Execute disk replacement.
4295 This dispatches the disk replacement to the appropriate handler.
4298 instance = self.instance
4299 if instance.disk_template == constants.DT_REMOTE_RAID1:
4301 elif instance.disk_template == constants.DT_DRBD8:
4302 if self.op.remote_node is None:
4303 fn = self._ExecD8DiskOnly
4305 fn = self._ExecD8Secondary
4307 raise errors.ProgrammerError("Unhandled disk replacement case")
4308 return fn(feedback_fn)
4311 class LUQueryInstanceData(NoHooksLU):
4312 """Query runtime instance data.
4315 _OP_REQP = ["instances"]
4317 def CheckPrereq(self):
4318 """Check prerequisites.
4320 This only checks the optional instance list against the existing names.
4323 if not isinstance(self.op.instances, list):
4324 raise errors.OpPrereqError("Invalid argument type 'instances'")
4325 if self.op.instances:
4326 self.wanted_instances = []
4327 names = self.op.instances
4329 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4330 if instance is None:
4331 raise errors.OpPrereqError("No such instance name '%s'" % name)
4332 self.wanted_instances.append(instance)
4334 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4335 in self.cfg.GetInstanceList()]
4339 def _ComputeDiskStatus(self, instance, snode, dev):
4340 """Compute block device status.
4343 self.cfg.SetDiskID(dev, instance.primary_node)
4344 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4345 if dev.dev_type in constants.LDS_DRBD:
4346 # we change the snode then (otherwise we use the one passed in)
4347 if dev.logical_id[0] == instance.primary_node:
4348 snode = dev.logical_id[1]
4350 snode = dev.logical_id[0]
4353 self.cfg.SetDiskID(dev, snode)
4354 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4359 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4360 for child in dev.children]
4365 "iv_name": dev.iv_name,
4366 "dev_type": dev.dev_type,
4367 "logical_id": dev.logical_id,
4368 "physical_id": dev.physical_id,
4369 "pstatus": dev_pstatus,
4370 "sstatus": dev_sstatus,
4371 "children": dev_children,
4376 def Exec(self, feedback_fn):
4377 """Gather and return data"""
4379 for instance in self.wanted_instances:
4380 remote_info = rpc.call_instance_info(instance.primary_node,
4382 if remote_info and "state" in remote_info:
4385 remote_state = "down"
4386 if instance.status == "down":
4387 config_state = "down"
4391 disks = [self._ComputeDiskStatus(instance, None, device)
4392 for device in instance.disks]
4395 "name": instance.name,
4396 "config_state": config_state,
4397 "run_state": remote_state,
4398 "pnode": instance.primary_node,
4399 "snodes": instance.secondary_nodes,
4401 "memory": instance.memory,
4402 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4404 "vcpus": instance.vcpus,
4407 htkind = self.sstore.GetHypervisorType()
4408 if htkind == constants.HT_XEN_PVM30:
4409 idict["kernel_path"] = instance.kernel_path
4410 idict["initrd_path"] = instance.initrd_path
4412 if htkind == constants.HT_XEN_HVM31:
4413 idict["hvm_boot_order"] = instance.hvm_boot_order
4414 idict["hvm_acpi"] = instance.hvm_acpi
4415 idict["hvm_pae"] = instance.hvm_pae
4416 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4418 if htkind in constants.HTS_REQ_PORT:
4419 idict["vnc_bind_address"] = instance.vnc_bind_address
4420 idict["network_port"] = instance.network_port
4422 result[instance.name] = idict
4427 class LUSetInstanceParms(LogicalUnit):
4428 """Modifies an instances's parameters.
4431 HPATH = "instance-modify"
4432 HTYPE = constants.HTYPE_INSTANCE
4433 _OP_REQP = ["instance_name"]
4435 def BuildHooksEnv(self):
4438 This runs on the master, primary and secondaries.
4443 args['memory'] = self.mem
4445 args['vcpus'] = self.vcpus
4446 if self.do_ip or self.do_bridge or self.mac:
4450 ip = self.instance.nics[0].ip
4452 bridge = self.bridge
4454 bridge = self.instance.nics[0].bridge
4458 mac = self.instance.nics[0].mac
4459 args['nics'] = [(ip, bridge, mac)]
4460 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4461 nl = [self.sstore.GetMasterNode(),
4462 self.instance.primary_node] + list(self.instance.secondary_nodes)
4465 def CheckPrereq(self):
4466 """Check prerequisites.
4468 This only checks the instance list against the existing names.
4471 self.mem = getattr(self.op, "mem", None)
4472 self.vcpus = getattr(self.op, "vcpus", None)
4473 self.ip = getattr(self.op, "ip", None)
4474 self.mac = getattr(self.op, "mac", None)
4475 self.bridge = getattr(self.op, "bridge", None)
4476 self.kernel_path = getattr(self.op, "kernel_path", None)
4477 self.initrd_path = getattr(self.op, "initrd_path", None)
4478 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4479 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4480 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4481 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4482 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4483 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4484 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4485 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4486 self.vnc_bind_address]
4487 if all_parms.count(None) == len(all_parms):
4488 raise errors.OpPrereqError("No changes submitted")
4489 if self.mem is not None:
4491 self.mem = int(self.mem)
4492 except ValueError, err:
4493 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4494 if self.vcpus is not None:
4496 self.vcpus = int(self.vcpus)
4497 except ValueError, err:
4498 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4499 if self.ip is not None:
4501 if self.ip.lower() == "none":
4504 if not utils.IsValidIP(self.ip):
4505 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4508 self.do_bridge = (self.bridge is not None)
4509 if self.mac is not None:
4510 if self.cfg.IsMacInUse(self.mac):
4511 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4513 if not utils.IsValidMac(self.mac):
4514 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4516 if self.kernel_path is not None:
4517 self.do_kernel_path = True
4518 if self.kernel_path == constants.VALUE_NONE:
4519 raise errors.OpPrereqError("Can't set instance to no kernel")
4521 if self.kernel_path != constants.VALUE_DEFAULT:
4522 if not os.path.isabs(self.kernel_path):
4523 raise errors.OpPrereqError("The kernel path must be an absolute"
4526 self.do_kernel_path = False
4528 if self.initrd_path is not None:
4529 self.do_initrd_path = True
4530 if self.initrd_path not in (constants.VALUE_NONE,
4531 constants.VALUE_DEFAULT):
4532 if not os.path.isabs(self.initrd_path):
4533 raise errors.OpPrereqError("The initrd path must be an absolute"
4536 self.do_initrd_path = False
4538 # boot order verification
4539 if self.hvm_boot_order is not None:
4540 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4541 if len(self.hvm_boot_order.strip("acdn")) != 0:
4542 raise errors.OpPrereqError("invalid boot order specified,"
4543 " must be one or more of [acdn]"
4546 # hvm_cdrom_image_path verification
4547 if self.op.hvm_cdrom_image_path is not None:
4548 if not os.path.isabs(self.op.hvm_cdrom_image_path):
4549 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4550 " be an absolute path or None, not %s" %
4551 self.op.hvm_cdrom_image_path)
4552 if not os.path.isfile(self.op.hvm_cdrom_image_path):
4553 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4554 " regular file or a symlink pointing to"
4555 " an existing regular file, not %s" %
4556 self.op.hvm_cdrom_image_path)
4558 # vnc_bind_address verification
4559 if self.op.vnc_bind_address is not None:
4560 if not utils.IsValidIP(self.op.vnc_bind_address):
4561 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4562 " like a valid IP address" %
4563 self.op.vnc_bind_address)
4565 instance = self.cfg.GetInstanceInfo(
4566 self.cfg.ExpandInstanceName(self.op.instance_name))
4567 if instance is None:
4568 raise errors.OpPrereqError("No such instance name '%s'" %
4569 self.op.instance_name)
4570 self.op.instance_name = instance.name
4571 self.instance = instance
4574 def Exec(self, feedback_fn):
4575 """Modifies an instance.
4577 All parameters take effect only at the next restart of the instance.
4580 instance = self.instance
4582 instance.memory = self.mem
4583 result.append(("mem", self.mem))
4585 instance.vcpus = self.vcpus
4586 result.append(("vcpus", self.vcpus))
4588 instance.nics[0].ip = self.ip
4589 result.append(("ip", self.ip))
4591 instance.nics[0].bridge = self.bridge
4592 result.append(("bridge", self.bridge))
4594 instance.nics[0].mac = self.mac
4595 result.append(("mac", self.mac))
4596 if self.do_kernel_path:
4597 instance.kernel_path = self.kernel_path
4598 result.append(("kernel_path", self.kernel_path))
4599 if self.do_initrd_path:
4600 instance.initrd_path = self.initrd_path
4601 result.append(("initrd_path", self.initrd_path))
4602 if self.hvm_boot_order:
4603 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4604 instance.hvm_boot_order = None
4606 instance.hvm_boot_order = self.hvm_boot_order
4607 result.append(("hvm_boot_order", self.hvm_boot_order))
4609 result.append(("hvm_acpi", self.hvm_acpi))
4611 result.append(("hvm_pae", self.hvm_pae))
4612 if self.hvm_cdrom_image_path:
4613 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4614 if self.vnc_bind_address:
4615 instance.vnc_bind_address = self.vnc_bind_address
4616 result.append(("vnc_bind_address", self.vnc_bind_address))
4618 self.cfg.AddInstance(instance)
4623 class LUQueryExports(NoHooksLU):
4624 """Query the exports list
4629 def CheckPrereq(self):
4630 """Check that the nodelist contains only existing nodes.
4633 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4635 def Exec(self, feedback_fn):
4636 """Compute the list of all the exported system images.
4639 a dictionary with the structure node->(export-list)
4640 where export-list is a list of the instances exported on
4644 return rpc.call_export_list(self.nodes)
4647 class LUExportInstance(LogicalUnit):
4648 """Export an instance to an image in the cluster.
4651 HPATH = "instance-export"
4652 HTYPE = constants.HTYPE_INSTANCE
4653 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4655 def BuildHooksEnv(self):
4658 This will run on the master, primary node and target node.
4662 "EXPORT_NODE": self.op.target_node,
4663 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4665 env.update(_BuildInstanceHookEnvByObject(self.instance))
4666 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4667 self.op.target_node]
4670 def CheckPrereq(self):
4671 """Check prerequisites.
4673 This checks that the instance and node names are valid.
4676 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4677 self.instance = self.cfg.GetInstanceInfo(instance_name)
4678 if self.instance is None:
4679 raise errors.OpPrereqError("Instance '%s' not found" %
4680 self.op.instance_name)
4683 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4684 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4686 if self.dst_node is None:
4687 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4688 self.op.target_node)
4689 self.op.target_node = self.dst_node.name
4691 def Exec(self, feedback_fn):
4692 """Export an instance to an image in the cluster.
4695 instance = self.instance
4696 dst_node = self.dst_node
4697 src_node = instance.primary_node
4698 if self.op.shutdown:
4699 # shutdown the instance, but not the disks
4700 if not rpc.call_instance_shutdown(src_node, instance):
4701 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4702 (instance.name, src_node))
4704 vgname = self.cfg.GetVGName()
4709 for disk in instance.disks:
4710 if disk.iv_name == "sda":
4711 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4712 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4714 if not new_dev_name:
4715 logger.Error("could not snapshot block device %s on node %s" %
4716 (disk.logical_id[1], src_node))
4718 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4719 logical_id=(vgname, new_dev_name),
4720 physical_id=(vgname, new_dev_name),
4721 iv_name=disk.iv_name)
4722 snap_disks.append(new_dev)
4725 if self.op.shutdown and instance.status == "up":
4726 if not rpc.call_instance_start(src_node, instance, None):
4727 _ShutdownInstanceDisks(instance, self.cfg)
4728 raise errors.OpExecError("Could not start instance")
4730 # TODO: check for size
4732 for dev in snap_disks:
4733 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4735 logger.Error("could not export block device %s from node"
4737 (dev.logical_id[1], src_node, dst_node.name))
4738 if not rpc.call_blockdev_remove(src_node, dev):
4739 logger.Error("could not remove snapshot block device %s from"
4740 " node %s" % (dev.logical_id[1], src_node))
4742 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4743 logger.Error("could not finalize export for instance %s on node %s" %
4744 (instance.name, dst_node.name))
4746 nodelist = self.cfg.GetNodeList()
4747 nodelist.remove(dst_node.name)
4749 # on one-node clusters nodelist will be empty after the removal
4750 # if we proceed the backup would be removed because OpQueryExports
4751 # substitutes an empty list with the full cluster node list.
4753 op = opcodes.OpQueryExports(nodes=nodelist)
4754 exportlist = self.proc.ChainOpCode(op)
4755 for node in exportlist:
4756 if instance.name in exportlist[node]:
4757 if not rpc.call_export_remove(node, instance.name):
4758 logger.Error("could not remove older export for instance %s"
4759 " on node %s" % (instance.name, node))
4762 class LURemoveExport(NoHooksLU):
4763 """Remove exports related to the named instance.
4766 _OP_REQP = ["instance_name"]
4768 def CheckPrereq(self):
4769 """Check prerequisites.
4773 def Exec(self, feedback_fn):
4774 """Remove any export.
4777 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4778 # If the instance was not found we'll try with the name that was passed in.
4779 # This will only work if it was an FQDN, though.
4781 if not instance_name:
4783 instance_name = self.op.instance_name
4785 op = opcodes.OpQueryExports(nodes=[])
4786 exportlist = self.proc.ChainOpCode(op)
4788 for node in exportlist:
4789 if instance_name in exportlist[node]:
4791 if not rpc.call_export_remove(node, instance_name):
4792 logger.Error("could not remove export for instance %s"
4793 " on node %s" % (instance_name, node))
4795 if fqdn_warn and not found:
4796 feedback_fn("Export not found. If trying to remove an export belonging"
4797 " to a deleted instance please use its Fully Qualified"
4801 class TagsLU(NoHooksLU):
4804 This is an abstract class which is the parent of all the other tags LUs.
4807 def CheckPrereq(self):
4808 """Check prerequisites.
4811 if self.op.kind == constants.TAG_CLUSTER:
4812 self.target = self.cfg.GetClusterInfo()
4813 elif self.op.kind == constants.TAG_NODE:
4814 name = self.cfg.ExpandNodeName(self.op.name)
4816 raise errors.OpPrereqError("Invalid node name (%s)" %
4819 self.target = self.cfg.GetNodeInfo(name)
4820 elif self.op.kind == constants.TAG_INSTANCE:
4821 name = self.cfg.ExpandInstanceName(self.op.name)
4823 raise errors.OpPrereqError("Invalid instance name (%s)" %
4826 self.target = self.cfg.GetInstanceInfo(name)
4828 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4832 class LUGetTags(TagsLU):
4833 """Returns the tags of a given object.
4836 _OP_REQP = ["kind", "name"]
4838 def Exec(self, feedback_fn):
4839 """Returns the tag list.
4842 return self.target.GetTags()
4845 class LUSearchTags(NoHooksLU):
4846 """Searches the tags for a given pattern.
4849 _OP_REQP = ["pattern"]
4851 def CheckPrereq(self):
4852 """Check prerequisites.
4854 This checks the pattern passed for validity by compiling it.
4858 self.re = re.compile(self.op.pattern)
4859 except re.error, err:
4860 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4861 (self.op.pattern, err))
4863 def Exec(self, feedback_fn):
4864 """Returns the tag list.
4868 tgts = [("/cluster", cfg.GetClusterInfo())]
4869 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4870 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4871 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4872 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4874 for path, target in tgts:
4875 for tag in target.GetTags():
4876 if self.re.search(tag):
4877 results.append((path, tag))
4881 class LUAddTags(TagsLU):
4882 """Sets a tag on a given object.
4885 _OP_REQP = ["kind", "name", "tags"]
4887 def CheckPrereq(self):
4888 """Check prerequisites.
4890 This checks the type and length of the tag name and value.
4893 TagsLU.CheckPrereq(self)
4894 for tag in self.op.tags:
4895 objects.TaggableObject.ValidateTag(tag)
4897 def Exec(self, feedback_fn):
4902 for tag in self.op.tags:
4903 self.target.AddTag(tag)
4904 except errors.TagError, err:
4905 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4907 self.cfg.Update(self.target)
4908 except errors.ConfigurationError:
4909 raise errors.OpRetryError("There has been a modification to the"
4910 " config file and the operation has been"
4911 " aborted. Please retry.")
4914 class LUDelTags(TagsLU):
4915 """Delete a list of tags from a given object.
4918 _OP_REQP = ["kind", "name", "tags"]
4920 def CheckPrereq(self):
4921 """Check prerequisites.
4923 This checks that we have the given tag.
4926 TagsLU.CheckPrereq(self)
4927 for tag in self.op.tags:
4928 objects.TaggableObject.ValidateTag(tag)
4929 del_tags = frozenset(self.op.tags)
4930 cur_tags = self.target.GetTags()
4931 if not del_tags <= cur_tags:
4932 diff_tags = del_tags - cur_tags
4933 diff_names = ["'%s'" % tag for tag in diff_tags]
4935 raise errors.OpPrereqError("Tag(s) %s not found" %
4936 (",".join(diff_names)))
4938 def Exec(self, feedback_fn):
4939 """Remove the tag from the object.
4942 for tag in self.op.tags:
4943 self.target.RemoveTag(tag)
4945 self.cfg.Update(self.target)
4946 except errors.ConfigurationError:
4947 raise errors.OpRetryError("There has been a modification to the"
4948 " config file and the operation has been"
4949 " aborted. Please retry.")
4951 class LUTestDelay(NoHooksLU):
4952 """Sleep for a specified amount of time.
4954 This LU sleeps on the master and/or nodes for a specified amoutn of
4958 _OP_REQP = ["duration", "on_master", "on_nodes"]
4960 def CheckPrereq(self):
4961 """Check prerequisites.
4963 This checks that we have a good list of nodes and/or the duration
4968 if self.op.on_nodes:
4969 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4971 def Exec(self, feedback_fn):
4972 """Do the actual sleep.
4975 if self.op.on_master:
4976 if not utils.TestDelay(self.op.duration):
4977 raise errors.OpExecError("Error during master delay test")
4978 if self.op.on_nodes:
4979 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4981 raise errors.OpExecError("Complete failure from rpc call")
4982 for node, node_result in result.items():
4984 raise errors.OpExecError("Failure during rpc call to node %s,"
4985 " result: %s" % (node, node_result))
4988 class IAllocator(object):
4989 """IAllocator framework.
4991 An IAllocator instance has three sets of attributes:
4992 - cfg/sstore that are needed to query the cluster
4993 - input data (all members of the _KEYS class attribute are required)
4994 - four buffer attributes (in|out_data|text), that represent the
4995 input (to the external script) in text and data structure format,
4996 and the output from it, again in two formats
4997 - the result variables from the script (success, info, nodes) for
5002 "mem_size", "disks", "disk_template",
5003 "os", "tags", "nics", "vcpus",
5009 def __init__(self, cfg, sstore, mode, name, **kwargs):
5011 self.sstore = sstore
5012 # init buffer variables
5013 self.in_text = self.out_text = self.in_data = self.out_data = None
5014 # init all input fields so that pylint is happy
5017 self.mem_size = self.disks = self.disk_template = None
5018 self.os = self.tags = self.nics = self.vcpus = None
5019 self.relocate_from = None
5021 self.required_nodes = None
5022 # init result fields
5023 self.success = self.info = self.nodes = None
5024 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5025 keyset = self._ALLO_KEYS
5026 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5027 keyset = self._RELO_KEYS
5029 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5030 " IAllocator" % self.mode)
5032 if key not in keyset:
5033 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5034 " IAllocator" % key)
5035 setattr(self, key, kwargs[key])
5037 if key not in kwargs:
5038 raise errors.ProgrammerError("Missing input parameter '%s' to"
5039 " IAllocator" % key)
5040 self._BuildInputData()
5042 def _ComputeClusterData(self):
5043 """Compute the generic allocator input data.
5045 This is the data that is independent of the actual operation.
5052 "cluster_name": self.sstore.GetClusterName(),
5053 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5054 "hypervisor_type": self.sstore.GetHypervisorType(),
5055 # we don't have job IDs
5058 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5062 node_list = cfg.GetNodeList()
5063 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5064 for nname in node_list:
5065 ninfo = cfg.GetNodeInfo(nname)
5066 if nname not in node_data or not isinstance(node_data[nname], dict):
5067 raise errors.OpExecError("Can't get data for node %s" % nname)
5068 remote_info = node_data[nname]
5069 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5070 'vg_size', 'vg_free']:
5071 if attr not in remote_info:
5072 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5075 remote_info[attr] = int(remote_info[attr])
5076 except ValueError, err:
5077 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5078 " %s" % (nname, attr, str(err)))
5079 # compute memory used by primary instances
5080 i_p_mem = i_p_up_mem = 0
5081 for iinfo in i_list:
5082 if iinfo.primary_node == nname:
5083 i_p_mem += iinfo.memory
5084 if iinfo.status == "up":
5085 i_p_up_mem += iinfo.memory
5087 # compute memory used by instances
5089 "tags": list(ninfo.GetTags()),
5090 "total_memory": remote_info['memory_total'],
5091 "reserved_memory": remote_info['memory_dom0'],
5092 "free_memory": remote_info['memory_free'],
5093 "i_pri_memory": i_p_mem,
5094 "i_pri_up_memory": i_p_up_mem,
5095 "total_disk": remote_info['vg_size'],
5096 "free_disk": remote_info['vg_free'],
5097 "primary_ip": ninfo.primary_ip,
5098 "secondary_ip": ninfo.secondary_ip,
5100 node_results[nname] = pnr
5101 data["nodes"] = node_results
5105 for iinfo in i_list:
5106 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5107 for n in iinfo.nics]
5109 "tags": list(iinfo.GetTags()),
5110 "should_run": iinfo.status == "up",
5111 "vcpus": iinfo.vcpus,
5112 "memory": iinfo.memory,
5114 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5116 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5117 "disk_template": iinfo.disk_template,
5119 instance_data[iinfo.name] = pir
5121 data["instances"] = instance_data
5125 def _AddNewInstance(self):
5126 """Add new instance data to allocator structure.
5128 This in combination with _AllocatorGetClusterData will create the
5129 correct structure needed as input for the allocator.
5131 The checks for the completeness of the opcode must have already been
5136 if len(self.disks) != 2:
5137 raise errors.OpExecError("Only two-disk configurations supported")
5139 disk_space = _ComputeDiskSize(self.disk_template,
5140 self.disks[0]["size"], self.disks[1]["size"])
5142 if self.disk_template in constants.DTS_NET_MIRROR:
5143 self.required_nodes = 2
5145 self.required_nodes = 1
5149 "disk_template": self.disk_template,
5152 "vcpus": self.vcpus,
5153 "memory": self.mem_size,
5154 "disks": self.disks,
5155 "disk_space_total": disk_space,
5157 "required_nodes": self.required_nodes,
5159 data["request"] = request
5161 def _AddRelocateInstance(self):
5162 """Add relocate instance data to allocator structure.
5164 This in combination with _IAllocatorGetClusterData will create the
5165 correct structure needed as input for the allocator.
5167 The checks for the completeness of the opcode must have already been
5171 instance = self.cfg.GetInstanceInfo(self.name)
5172 if instance is None:
5173 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5174 " IAllocator" % self.name)
5176 if instance.disk_template not in constants.DTS_NET_MIRROR:
5177 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5179 if len(instance.secondary_nodes) != 1:
5180 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5182 self.required_nodes = 1
5184 disk_space = _ComputeDiskSize(instance.disk_template,
5185 instance.disks[0].size,
5186 instance.disks[1].size)
5191 "disk_space_total": disk_space,
5192 "required_nodes": self.required_nodes,
5193 "relocate_from": self.relocate_from,
5195 self.in_data["request"] = request
5197 def _BuildInputData(self):
5198 """Build input data structures.
5201 self._ComputeClusterData()
5203 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5204 self._AddNewInstance()
5206 self._AddRelocateInstance()
5208 self.in_text = serializer.Dump(self.in_data)
5210 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5211 """Run an instance allocator and return the results.
5216 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5218 if not isinstance(result, tuple) or len(result) != 4:
5219 raise errors.OpExecError("Invalid result from master iallocator runner")
5221 rcode, stdout, stderr, fail = result
5223 if rcode == constants.IARUN_NOTFOUND:
5224 raise errors.OpExecError("Can't find allocator '%s'" % name)
5225 elif rcode == constants.IARUN_FAILURE:
5226 raise errors.OpExecError("Instance allocator call failed: %s,"
5228 (fail, stdout+stderr))
5229 self.out_text = stdout
5231 self._ValidateResult()
5233 def _ValidateResult(self):
5234 """Process the allocator results.
5236 This will process and if successful save the result in
5237 self.out_data and the other parameters.
5241 rdict = serializer.Load(self.out_text)
5242 except Exception, err:
5243 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5245 if not isinstance(rdict, dict):
5246 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5248 for key in "success", "info", "nodes":
5249 if key not in rdict:
5250 raise errors.OpExecError("Can't parse iallocator results:"
5251 " missing key '%s'" % key)
5252 setattr(self, key, rdict[key])
5254 if not isinstance(rdict["nodes"], list):
5255 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5257 self.out_data = rdict
5260 class LUTestAllocator(NoHooksLU):
5261 """Run allocator tests.
5263 This LU runs the allocator tests
5266 _OP_REQP = ["direction", "mode", "name"]
5268 def CheckPrereq(self):
5269 """Check prerequisites.
5271 This checks the opcode parameters depending on the director and mode test.
5274 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5275 for attr in ["name", "mem_size", "disks", "disk_template",
5276 "os", "tags", "nics", "vcpus"]:
5277 if not hasattr(self.op, attr):
5278 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5280 iname = self.cfg.ExpandInstanceName(self.op.name)
5281 if iname is not None:
5282 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5284 if not isinstance(self.op.nics, list):
5285 raise errors.OpPrereqError("Invalid parameter 'nics'")
5286 for row in self.op.nics:
5287 if (not isinstance(row, dict) or
5290 "bridge" not in row):
5291 raise errors.OpPrereqError("Invalid contents of the"
5292 " 'nics' parameter")
5293 if not isinstance(self.op.disks, list):
5294 raise errors.OpPrereqError("Invalid parameter 'disks'")
5295 if len(self.op.disks) != 2:
5296 raise errors.OpPrereqError("Only two-disk configurations supported")
5297 for row in self.op.disks:
5298 if (not isinstance(row, dict) or
5299 "size" not in row or
5300 not isinstance(row["size"], int) or
5301 "mode" not in row or
5302 row["mode"] not in ['r', 'w']):
5303 raise errors.OpPrereqError("Invalid contents of the"
5304 " 'disks' parameter")
5305 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5306 if not hasattr(self.op, "name"):
5307 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5308 fname = self.cfg.ExpandInstanceName(self.op.name)
5310 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5312 self.op.name = fname
5313 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5315 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5318 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5319 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5320 raise errors.OpPrereqError("Missing allocator name")
5321 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5322 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5325 def Exec(self, feedback_fn):
5326 """Run the allocator test.
5329 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5330 ial = IAllocator(self.cfg, self.sstore,
5333 mem_size=self.op.mem_size,
5334 disks=self.op.disks,
5335 disk_template=self.op.disk_template,
5339 vcpus=self.op.vcpus,
5342 ial = IAllocator(self.cfg, self.sstore,
5345 relocate_from=list(self.relocate_from),
5348 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5349 result = ial.in_text
5351 ial.Run(self.op.allocator, validate=False)
5352 result = ial.out_text