4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_MASTER: the LU needs to run on the master node
59 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61 Note that all commands require root permissions.
70 def __init__(self, processor, op, context, rpc):
71 """Constructor for LogicalUnit.
73 This needs to be overriden in derived classes in order to check op
79 self.cfg = context.cfg
80 self.context = context
82 # Dicts used to declare locking needs to mcpu
83 self.needed_locks = None
84 self.acquired_locks = {}
85 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87 self.remove_locks = {}
88 # Used to force good behavior when calling helper functions
89 self.recalculate_locks = {}
92 for attr_name in self._OP_REQP:
93 attr_val = getattr(op, attr_name, None)
95 raise errors.OpPrereqError("Required parameter '%s' missing" %
98 if not self.cfg.IsCluster():
99 raise errors.OpPrereqError("Cluster not initialized yet,"
100 " use 'gnt-cluster init' first.")
102 master = self.cfg.GetMasterNode()
103 if master != utils.HostInfo().name:
104 raise errors.OpPrereqError("Commands must be run on the master"
108 """Returns the SshRunner object
112 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115 ssh = property(fget=__GetSSH)
117 def ExpandNames(self):
118 """Expand names for this LU.
120 This method is called before starting to execute the opcode, and it should
121 update all the parameters of the opcode to their canonical form (e.g. a
122 short node name must be fully expanded after this method has successfully
123 completed). This way locking, hooks, logging, ecc. can work correctly.
125 LUs which implement this method must also populate the self.needed_locks
126 member, as a dict with lock levels as keys, and a list of needed lock names
128 - Use an empty dict if you don't need any lock
129 - If you don't need any lock at a particular level omit that level
130 - Don't put anything for the BGL level
131 - If you want all locks at a level use locking.ALL_SET as a value
133 If you need to share locks (rather than acquire them exclusively) at one
134 level you can modify self.share_locks, setting a true value (usually 1) for
135 that level. By default locks are not shared.
138 # Acquire all nodes and one instance
139 self.needed_locks = {
140 locking.LEVEL_NODE: locking.ALL_SET,
141 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
143 # Acquire just two nodes
144 self.needed_locks = {
145 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148 self.needed_locks = {} # No, you can't leave it to the default value None
151 # The implementation of this method is mandatory only if the new LU is
152 # concurrent, so that old LUs don't need to be changed all at the same
155 self.needed_locks = {} # Exclusive LUs don't need locks.
157 raise NotImplementedError
159 def DeclareLocks(self, level):
160 """Declare LU locking needs for a level
162 While most LUs can just declare their locking needs at ExpandNames time,
163 sometimes there's the need to calculate some locks after having acquired
164 the ones before. This function is called just before acquiring locks at a
165 particular level, but after acquiring the ones at lower levels, and permits
166 such calculations. It can be used to modify self.needed_locks, and by
167 default it does nothing.
169 This function is only called if you have something already set in
170 self.needed_locks for the level.
172 @param level: Locking level which is going to be locked
173 @type level: member of ganeti.locking.LEVELS
177 def CheckPrereq(self):
178 """Check prerequisites for this LU.
180 This method should check that the prerequisites for the execution
181 of this LU are fulfilled. It can do internode communication, but
182 it should be idempotent - no cluster or system changes are
185 The method should raise errors.OpPrereqError in case something is
186 not fulfilled. Its return value is ignored.
188 This method should also update all the parameters of the opcode to
189 their canonical form if it hasn't been done by ExpandNames before.
192 raise NotImplementedError
194 def Exec(self, feedback_fn):
197 This method should implement the actual work. It should raise
198 errors.OpExecError for failures that are somewhat dealt with in
202 raise NotImplementedError
204 def BuildHooksEnv(self):
205 """Build hooks environment for this LU.
207 This method should return a three-node tuple consisting of: a dict
208 containing the environment that will be used for running the
209 specific hook for this LU, a list of node names on which the hook
210 should run before the execution, and a list of node names on which
211 the hook should run after the execution.
213 The keys of the dict must not have 'GANETI_' prefixed as this will
214 be handled in the hooks runner. Also note additional keys will be
215 added by the hooks runner. If the LU doesn't define any
216 environment, an empty dict (and not None) should be returned.
218 No nodes should be returned as an empty list (and not None).
220 Note that if the HPATH for a LU class is None, this function will
224 raise NotImplementedError
226 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227 """Notify the LU about the results of its hooks.
229 This method is called every time a hooks phase is executed, and notifies
230 the Logical Unit about the hooks' result. The LU can then use it to alter
231 its result based on the hooks. By default the method does nothing and the
232 previous result is passed back unchanged but any LU can define it if it
233 wants to use the local cluster hook-scripts somehow.
236 phase: the hooks phase that has just been run
237 hooks_results: the results of the multi-node hooks rpc call
238 feedback_fn: function to send feedback back to the caller
239 lu_result: the previous result this LU had, or None in the PRE phase.
244 def _ExpandAndLockInstance(self):
245 """Helper function to expand and lock an instance.
247 Many LUs that work on an instance take its name in self.op.instance_name
248 and need to expand it and then declare the expanded name for locking. This
249 function does it, and then updates self.op.instance_name to the expanded
250 name. It also initializes needed_locks as a dict, if this hasn't been done
254 if self.needed_locks is None:
255 self.needed_locks = {}
257 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258 "_ExpandAndLockInstance called with instance-level locks set"
259 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260 if expanded_name is None:
261 raise errors.OpPrereqError("Instance '%s' not known" %
262 self.op.instance_name)
263 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264 self.op.instance_name = expanded_name
266 def _LockInstancesNodes(self, primary_only=False):
267 """Helper function to declare instances' nodes for locking.
269 This function should be called after locking one or more instances to lock
270 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271 with all primary or secondary nodes for instances already locked and
272 present in self.needed_locks[locking.LEVEL_INSTANCE].
274 It should be called from DeclareLocks, and for safety only works if
275 self.recalculate_locks[locking.LEVEL_NODE] is set.
277 In the future it may grow parameters to just lock some instance's nodes, or
278 to just lock primaries or secondary nodes, if needed.
280 If should be called in DeclareLocks in a way similar to:
282 if level == locking.LEVEL_NODE:
283 self._LockInstancesNodes()
285 @type primary_only: boolean
286 @param primary_only: only lock primary nodes of locked instances
289 assert locking.LEVEL_NODE in self.recalculate_locks, \
290 "_LockInstancesNodes helper function called with no nodes to recalculate"
292 # TODO: check if we're really been called with the instance locks held
294 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295 # future we might want to have different behaviors depending on the value
296 # of self.recalculate_locks[locking.LEVEL_NODE]
298 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299 instance = self.context.cfg.GetInstanceInfo(instance_name)
300 wanted_nodes.append(instance.primary_node)
302 wanted_nodes.extend(instance.secondary_nodes)
304 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
309 del self.recalculate_locks[locking.LEVEL_NODE]
312 class NoHooksLU(LogicalUnit):
313 """Simple LU which runs no hooks.
315 This LU is intended as a parent for other LogicalUnits which will
316 run no hooks, in order to reduce duplicate code.
323 def _GetWantedNodes(lu, nodes):
324 """Returns list of checked and expanded node names.
327 nodes: List of nodes (strings) or None for all
330 if not isinstance(nodes, list):
331 raise errors.OpPrereqError("Invalid argument type 'nodes'")
334 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335 " non-empty list of nodes whose name is to be expanded.")
339 node = lu.cfg.ExpandNodeName(name)
341 raise errors.OpPrereqError("No such node name '%s'" % name)
344 return utils.NiceSort(wanted)
347 def _GetWantedInstances(lu, instances):
348 """Returns list of checked and expanded instance names.
351 instances: List of instances (strings) or None for all
354 if not isinstance(instances, list):
355 raise errors.OpPrereqError("Invalid argument type 'instances'")
360 for name in instances:
361 instance = lu.cfg.ExpandInstanceName(name)
363 raise errors.OpPrereqError("No such instance name '%s'" % name)
364 wanted.append(instance)
367 wanted = lu.cfg.GetInstanceList()
368 return utils.NiceSort(wanted)
371 def _CheckOutputFields(static, dynamic, selected):
372 """Checks whether all selected fields are valid.
375 static: Static fields
376 dynamic: Dynamic fields
379 static_fields = frozenset(static)
380 dynamic_fields = frozenset(dynamic)
382 all_fields = static_fields | dynamic_fields
384 if not all_fields.issuperset(selected):
385 raise errors.OpPrereqError("Unknown output fields selected: %s"
386 % ",".join(frozenset(selected).
387 difference(all_fields)))
390 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391 memory, vcpus, nics):
392 """Builds instance related env variables for hooks from single variables.
395 secondary_nodes: List of secondary nodes as strings
399 "INSTANCE_NAME": name,
400 "INSTANCE_PRIMARY": primary_node,
401 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402 "INSTANCE_OS_TYPE": os_type,
403 "INSTANCE_STATUS": status,
404 "INSTANCE_MEMORY": memory,
405 "INSTANCE_VCPUS": vcpus,
409 nic_count = len(nics)
410 for idx, (ip, bridge, mac) in enumerate(nics):
413 env["INSTANCE_NIC%d_IP" % idx] = ip
414 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
419 env["INSTANCE_NIC_COUNT"] = nic_count
424 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
425 """Builds instance related env variables for hooks from an object.
428 instance: objects.Instance object of instance
429 override: dict of values to override
431 bep = lu.cfg.GetClusterInfo().FillBE(instance)
433 'name': instance.name,
434 'primary_node': instance.primary_node,
435 'secondary_nodes': instance.secondary_nodes,
436 'os_type': instance.os,
437 'status': instance.os,
438 'memory': bep[constants.BE_MEMORY],
439 'vcpus': bep[constants.BE_VCPUS],
440 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
443 args.update(override)
444 return _BuildInstanceHookEnv(**args)
447 def _CheckInstanceBridgesExist(lu, instance):
448 """Check that the brigdes needed by an instance exist.
451 # check bridges existance
452 brlist = [nic.bridge for nic in instance.nics]
453 if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
454 raise errors.OpPrereqError("one or more target bridges %s does not"
455 " exist on destination node '%s'" %
456 (brlist, instance.primary_node))
459 class LUDestroyCluster(NoHooksLU):
460 """Logical unit for destroying the cluster.
465 def CheckPrereq(self):
466 """Check prerequisites.
468 This checks whether the cluster is empty.
470 Any errors are signalled by raising errors.OpPrereqError.
473 master = self.cfg.GetMasterNode()
475 nodelist = self.cfg.GetNodeList()
476 if len(nodelist) != 1 or nodelist[0] != master:
477 raise errors.OpPrereqError("There are still %d node(s) in"
478 " this cluster." % (len(nodelist) - 1))
479 instancelist = self.cfg.GetInstanceList()
481 raise errors.OpPrereqError("There are still %d instance(s) in"
482 " this cluster." % len(instancelist))
484 def Exec(self, feedback_fn):
485 """Destroys the cluster.
488 master = self.cfg.GetMasterNode()
489 if not self.rpc.call_node_stop_master(master, False):
490 raise errors.OpExecError("Could not disable the master role")
491 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492 utils.CreateBackup(priv_key)
493 utils.CreateBackup(pub_key)
497 class LUVerifyCluster(LogicalUnit):
498 """Verifies the cluster status.
501 HPATH = "cluster-verify"
502 HTYPE = constants.HTYPE_CLUSTER
503 _OP_REQP = ["skip_checks"]
506 def ExpandNames(self):
507 self.needed_locks = {
508 locking.LEVEL_NODE: locking.ALL_SET,
509 locking.LEVEL_INSTANCE: locking.ALL_SET,
511 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514 remote_version, feedback_fn):
515 """Run multiple tests against a node.
518 - compares ganeti version
519 - checks vg existance and size > 20G
520 - checks config file checksum
521 - checks ssh to other nodes
524 node: name of the node to check
525 file_list: required list of files
526 local_cksum: dictionary of local files and their checksums
529 # compares ganeti version
530 local_version = constants.PROTOCOL_VERSION
531 if not remote_version:
532 feedback_fn(" - ERROR: connection to %s failed" % (node))
535 if local_version != remote_version:
536 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
537 (local_version, node, remote_version))
540 # checks vg existance and size > 20G
544 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
548 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549 constants.MIN_VG_SIZE)
551 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
555 feedback_fn(" - ERROR: unable to verify node %s." % (node,))
558 # checks config file checksum
561 if 'filelist' not in node_result:
563 feedback_fn(" - ERROR: node hasn't returned file checksum data")
565 remote_cksum = node_result['filelist']
566 for file_name in file_list:
567 if file_name not in remote_cksum:
569 feedback_fn(" - ERROR: file '%s' missing" % file_name)
570 elif remote_cksum[file_name] != local_cksum[file_name]:
572 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
574 if 'nodelist' not in node_result:
576 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
578 if node_result['nodelist']:
580 for node in node_result['nodelist']:
581 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
582 (node, node_result['nodelist'][node]))
583 if 'node-net-test' not in node_result:
585 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
587 if node_result['node-net-test']:
589 nlist = utils.NiceSort(node_result['node-net-test'].keys())
591 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
592 (node, node_result['node-net-test'][node]))
594 hyp_result = node_result.get('hypervisor', None)
595 if isinstance(hyp_result, dict):
596 for hv_name, hv_result in hyp_result.iteritems():
597 if hv_result is not None:
598 feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" %
599 (hv_name, hv_result))
602 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
603 node_instance, feedback_fn):
604 """Verify an instance.
606 This function checks to see if the required block devices are
607 available on the instance's node.
612 node_current = instanceconfig.primary_node
615 instanceconfig.MapLVsByNode(node_vol_should)
617 for node in node_vol_should:
618 for volume in node_vol_should[node]:
619 if node not in node_vol_is or volume not in node_vol_is[node]:
620 feedback_fn(" - ERROR: volume %s missing on node %s" %
624 if not instanceconfig.status == 'down':
625 if (node_current not in node_instance or
626 not instance in node_instance[node_current]):
627 feedback_fn(" - ERROR: instance %s not running on node %s" %
628 (instance, node_current))
631 for node in node_instance:
632 if (not node == node_current):
633 if instance in node_instance[node]:
634 feedback_fn(" - ERROR: instance %s should not run on node %s" %
640 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
641 """Verify if there are any unknown volumes in the cluster.
643 The .os, .swap and backup volumes are ignored. All other volumes are
649 for node in node_vol_is:
650 for volume in node_vol_is[node]:
651 if node not in node_vol_should or volume not in node_vol_should[node]:
652 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
657 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
658 """Verify the list of running instances.
660 This checks what instances are running but unknown to the cluster.
664 for node in node_instance:
665 for runninginstance in node_instance[node]:
666 if runninginstance not in instancelist:
667 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
668 (runninginstance, node))
672 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
673 """Verify N+1 Memory Resilience.
675 Check that if one single node dies we can still start all the instances it
681 for node, nodeinfo in node_info.iteritems():
682 # This code checks that every node which is now listed as secondary has
683 # enough memory to host all instances it is supposed to should a single
684 # other node in the cluster fail.
685 # FIXME: not ready for failover to an arbitrary node
686 # FIXME: does not support file-backed instances
687 # WARNING: we currently take into account down instances as well as up
688 # ones, considering that even if they're down someone might want to start
689 # them even in the event of a node failure.
690 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
692 for instance in instances:
693 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
694 if bep[constants.BE_AUTO_BALANCE]:
695 needed_mem += bep[constants.BE_MEMORY]
696 if nodeinfo['mfree'] < needed_mem:
697 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
698 " failovers should node %s fail" % (node, prinode))
702 def CheckPrereq(self):
703 """Check prerequisites.
705 Transform the list of checks we're going to skip into a set and check that
706 all its members are valid.
709 self.skip_set = frozenset(self.op.skip_checks)
710 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
711 raise errors.OpPrereqError("Invalid checks to be skipped specified")
713 def BuildHooksEnv(self):
716 Cluster-Verify hooks just rone in the post phase and their failure makes
717 the output be logged in the verify output and the verification to fail.
720 all_nodes = self.cfg.GetNodeList()
721 # TODO: populate the environment with useful information for verify hooks
723 return env, [], all_nodes
725 def Exec(self, feedback_fn):
726 """Verify integrity of cluster, performing various test on nodes.
730 feedback_fn("* Verifying global settings")
731 for msg in self.cfg.VerifyConfig():
732 feedback_fn(" - ERROR: %s" % msg)
734 vg_name = self.cfg.GetVGName()
735 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
736 nodelist = utils.NiceSort(self.cfg.GetNodeList())
737 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
738 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
739 i_non_redundant = [] # Non redundant instances
740 i_non_a_balanced = [] # Non auto-balanced instances
746 # FIXME: verify OS list
749 file_names.append(constants.SSL_CERT_FILE)
750 file_names.append(constants.CLUSTER_CONF_FILE)
751 local_checksums = utils.FingerprintFiles(file_names)
753 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
754 all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
755 all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
756 all_vglist = self.rpc.call_vg_list(nodelist)
757 node_verify_param = {
758 'filelist': file_names,
759 'nodelist': nodelist,
760 'hypervisor': hypervisors,
761 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
762 for node in nodeinfo]
764 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
765 self.cfg.GetClusterName())
766 all_rversion = self.rpc.call_version(nodelist)
767 all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
768 self.cfg.GetHypervisorType())
770 cluster = self.cfg.GetClusterInfo()
771 for node in nodelist:
772 feedback_fn("* Verifying node %s" % node)
773 result = self._VerifyNode(node, file_names, local_checksums,
774 all_vglist[node], all_nvinfo[node],
775 all_rversion[node], feedback_fn)
779 volumeinfo = all_volumeinfo[node]
781 if isinstance(volumeinfo, basestring):
782 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
783 (node, volumeinfo[-400:].encode('string_escape')))
785 node_volume[node] = {}
786 elif not isinstance(volumeinfo, dict):
787 feedback_fn(" - ERROR: connection to %s failed" % (node,))
791 node_volume[node] = volumeinfo
794 nodeinstance = all_instanceinfo[node]
795 if type(nodeinstance) != list:
796 feedback_fn(" - ERROR: connection to %s failed" % (node,))
800 node_instance[node] = nodeinstance
803 nodeinfo = all_ninfo[node]
804 if not isinstance(nodeinfo, dict):
805 feedback_fn(" - ERROR: connection to %s failed" % (node,))
811 "mfree": int(nodeinfo['memory_free']),
812 "dfree": int(nodeinfo['vg_free']),
815 # dictionary holding all instances this node is secondary for,
816 # grouped by their primary node. Each key is a cluster node, and each
817 # value is a list of instances which have the key as primary and the
818 # current node as secondary. this is handy to calculate N+1 memory
819 # availability if you can only failover from a primary to its
821 "sinst-by-pnode": {},
824 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
830 for instance in instancelist:
831 feedback_fn("* Verifying instance %s" % instance)
832 inst_config = self.cfg.GetInstanceInfo(instance)
833 result = self._VerifyInstance(instance, inst_config, node_volume,
834 node_instance, feedback_fn)
837 inst_config.MapLVsByNode(node_vol_should)
839 instance_cfg[instance] = inst_config
841 pnode = inst_config.primary_node
842 if pnode in node_info:
843 node_info[pnode]['pinst'].append(instance)
845 feedback_fn(" - ERROR: instance %s, connection to primary node"
846 " %s failed" % (instance, pnode))
849 # If the instance is non-redundant we cannot survive losing its primary
850 # node, so we are not N+1 compliant. On the other hand we have no disk
851 # templates with more than one secondary so that situation is not well
853 # FIXME: does not support file-backed instances
854 if len(inst_config.secondary_nodes) == 0:
855 i_non_redundant.append(instance)
856 elif len(inst_config.secondary_nodes) > 1:
857 feedback_fn(" - WARNING: multiple secondaries for instance %s"
860 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
861 i_non_a_balanced.append(instance)
863 for snode in inst_config.secondary_nodes:
864 if snode in node_info:
865 node_info[snode]['sinst'].append(instance)
866 if pnode not in node_info[snode]['sinst-by-pnode']:
867 node_info[snode]['sinst-by-pnode'][pnode] = []
868 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
870 feedback_fn(" - ERROR: instance %s, connection to secondary node"
871 " %s failed" % (instance, snode))
873 feedback_fn("* Verifying orphan volumes")
874 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
878 feedback_fn("* Verifying remaining instances")
879 result = self._VerifyOrphanInstances(instancelist, node_instance,
883 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
884 feedback_fn("* Verifying N+1 Memory redundancy")
885 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
888 feedback_fn("* Other Notes")
890 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
891 % len(i_non_redundant))
894 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
895 % len(i_non_a_balanced))
899 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
900 """Analize the post-hooks' result, handle it, and send some
901 nicely-formatted feedback back to the user.
904 phase: the hooks phase that has just been run
905 hooks_results: the results of the multi-node hooks rpc call
906 feedback_fn: function to send feedback back to the caller
907 lu_result: previous Exec result
910 # We only really run POST phase hooks, and are only interested in
912 if phase == constants.HOOKS_PHASE_POST:
913 # Used to change hooks' output to proper indentation
914 indent_re = re.compile('^', re.M)
915 feedback_fn("* Hooks Results")
916 if not hooks_results:
917 feedback_fn(" - ERROR: general communication failure")
920 for node_name in hooks_results:
921 show_node_header = True
922 res = hooks_results[node_name]
923 if res is False or not isinstance(res, list):
924 feedback_fn(" Communication failure")
927 for script, hkr, output in res:
928 if hkr == constants.HKR_FAIL:
929 # The node header is only shown once, if there are
930 # failing hooks on that node
932 feedback_fn(" Node %s:" % node_name)
933 show_node_header = False
934 feedback_fn(" ERROR: Script %s failed, output:" % script)
935 output = indent_re.sub(' ', output)
936 feedback_fn("%s" % output)
942 class LUVerifyDisks(NoHooksLU):
943 """Verifies the cluster disks status.
949 def ExpandNames(self):
950 self.needed_locks = {
951 locking.LEVEL_NODE: locking.ALL_SET,
952 locking.LEVEL_INSTANCE: locking.ALL_SET,
954 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
956 def CheckPrereq(self):
957 """Check prerequisites.
959 This has no prerequisites.
964 def Exec(self, feedback_fn):
965 """Verify integrity of cluster disks.
968 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
970 vg_name = self.cfg.GetVGName()
971 nodes = utils.NiceSort(self.cfg.GetNodeList())
972 instances = [self.cfg.GetInstanceInfo(name)
973 for name in self.cfg.GetInstanceList()]
976 for inst in instances:
978 if (inst.status != "up" or
979 inst.disk_template not in constants.DTS_NET_MIRROR):
981 inst.MapLVsByNode(inst_lvs)
982 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
983 for node, vol_list in inst_lvs.iteritems():
985 nv_dict[(node, vol)] = inst
990 node_lvs = self.rpc.call_volume_list(nodes, vg_name)
997 if isinstance(lvs, basestring):
998 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1000 elif not isinstance(lvs, dict):
1001 logger.Info("connection to node %s failed or invalid data returned" %
1003 res_nodes.append(node)
1006 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1007 inst = nv_dict.pop((node, lv_name), None)
1008 if (not lv_online and inst is not None
1009 and inst.name not in res_instances):
1010 res_instances.append(inst.name)
1012 # any leftover items in nv_dict are missing LVs, let's arrange the
1014 for key, inst in nv_dict.iteritems():
1015 if inst.name not in res_missing:
1016 res_missing[inst.name] = []
1017 res_missing[inst.name].append(key)
1022 class LURenameCluster(LogicalUnit):
1023 """Rename the cluster.
1026 HPATH = "cluster-rename"
1027 HTYPE = constants.HTYPE_CLUSTER
1030 def BuildHooksEnv(self):
1035 "OP_TARGET": self.cfg.GetClusterName(),
1036 "NEW_NAME": self.op.name,
1038 mn = self.cfg.GetMasterNode()
1039 return env, [mn], [mn]
1041 def CheckPrereq(self):
1042 """Verify that the passed name is a valid one.
1045 hostname = utils.HostInfo(self.op.name)
1047 new_name = hostname.name
1048 self.ip = new_ip = hostname.ip
1049 old_name = self.cfg.GetClusterName()
1050 old_ip = self.cfg.GetMasterIP()
1051 if new_name == old_name and new_ip == old_ip:
1052 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1053 " cluster has changed")
1054 if new_ip != old_ip:
1055 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1056 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1057 " reachable on the network. Aborting." %
1060 self.op.name = new_name
1062 def Exec(self, feedback_fn):
1063 """Rename the cluster.
1066 clustername = self.op.name
1069 # shutdown the master IP
1070 master = self.cfg.GetMasterNode()
1071 if not self.rpc.call_node_stop_master(master, False):
1072 raise errors.OpExecError("Could not disable the master role")
1077 ss.SetKey(ss.SS_MASTER_IP, ip)
1078 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1080 # Distribute updated ss config to all nodes
1081 myself = self.cfg.GetNodeInfo(master)
1082 dist_nodes = self.cfg.GetNodeList()
1083 if myself.name in dist_nodes:
1084 dist_nodes.remove(myself.name)
1086 logger.Debug("Copying updated ssconf data to all nodes")
1087 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1088 fname = ss.KeyToFilename(keyname)
1089 result = self.rpc.call_upload_file(dist_nodes, fname)
1090 for to_node in dist_nodes:
1091 if not result[to_node]:
1092 logger.Error("copy of file %s to node %s failed" %
1095 if not self.rpc.call_node_start_master(master, False):
1096 logger.Error("Could not re-enable the master role on the master,"
1097 " please restart manually.")
1100 def _RecursiveCheckIfLVMBased(disk):
1101 """Check if the given disk or its children are lvm-based.
1104 disk: ganeti.objects.Disk object
1107 boolean indicating whether a LD_LV dev_type was found or not
1111 for chdisk in disk.children:
1112 if _RecursiveCheckIfLVMBased(chdisk):
1114 return disk.dev_type == constants.LD_LV
1117 class LUSetClusterParams(LogicalUnit):
1118 """Change the parameters of the cluster.
1121 HPATH = "cluster-modify"
1122 HTYPE = constants.HTYPE_CLUSTER
1126 def ExpandNames(self):
1127 # FIXME: in the future maybe other cluster params won't require checking on
1128 # all nodes to be modified.
1129 self.needed_locks = {
1130 locking.LEVEL_NODE: locking.ALL_SET,
1132 self.share_locks[locking.LEVEL_NODE] = 1
1134 def BuildHooksEnv(self):
1139 "OP_TARGET": self.cfg.GetClusterName(),
1140 "NEW_VG_NAME": self.op.vg_name,
1142 mn = self.cfg.GetMasterNode()
1143 return env, [mn], [mn]
1145 def CheckPrereq(self):
1146 """Check prerequisites.
1148 This checks whether the given params don't conflict and
1149 if the given volume group is valid.
1152 # FIXME: This only works because there is only one parameter that can be
1153 # changed or removed.
1154 if self.op.vg_name is not None and not self.op.vg_name:
1155 instances = self.cfg.GetAllInstancesInfo().values()
1156 for inst in instances:
1157 for disk in inst.disks:
1158 if _RecursiveCheckIfLVMBased(disk):
1159 raise errors.OpPrereqError("Cannot disable lvm storage while"
1160 " lvm-based instances exist")
1162 node_list = self.acquired_locks[locking.LEVEL_NODE]
1164 # if vg_name not None, checks given volume group on all nodes
1166 vglist = self.rpc.call_vg_list(node_list)
1167 for node in node_list:
1168 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1169 constants.MIN_VG_SIZE)
1171 raise errors.OpPrereqError("Error on node '%s': %s" %
1174 self.cluster = cluster = self.cfg.GetClusterInfo()
1175 # beparams changes do not need validation (we can't validate?),
1176 # but we still process here
1177 if self.op.beparams:
1178 self.new_beparams = cluster.FillDict(
1179 cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1181 # hypervisor list/parameters
1182 self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1183 if self.op.hvparams:
1184 if not isinstance(self.op.hvparams, dict):
1185 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1186 for hv_name, hv_dict in self.op.hvparams.items():
1187 if hv_name not in self.new_hvparams:
1188 self.new_hvparams[hv_name] = hv_dict
1190 self.new_hvparams[hv_name].update(hv_dict)
1192 if self.op.enabled_hypervisors is not None:
1193 self.hv_list = self.op.enabled_hypervisors
1195 self.hv_list = cluster.enabled_hypervisors
1197 if self.op.hvparams or self.op.enabled_hypervisors is not None:
1198 # either the enabled list has changed, or the parameters have, validate
1199 for hv_name, hv_params in self.new_hvparams.items():
1200 if ((self.op.hvparams and hv_name in self.op.hvparams) or
1201 (self.op.enabled_hypervisors and
1202 hv_name in self.op.enabled_hypervisors)):
1203 # either this is a new hypervisor, or its parameters have changed
1204 hv_class = hypervisor.GetHypervisor(hv_name)
1205 hv_class.CheckParameterSyntax(hv_params)
1206 _CheckHVParams(self, node_list, hv_name, hv_params)
1208 def Exec(self, feedback_fn):
1209 """Change the parameters of the cluster.
1212 if self.op.vg_name is not None:
1213 if self.op.vg_name != self.cfg.GetVGName():
1214 self.cfg.SetVGName(self.op.vg_name)
1216 feedback_fn("Cluster LVM configuration already in desired"
1217 " state, not changing")
1218 if self.op.hvparams:
1219 self.cluster.hvparams = self.new_hvparams
1220 if self.op.enabled_hypervisors is not None:
1221 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1222 if self.op.beparams:
1223 self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1224 self.cfg.Update(self.cluster)
1227 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1228 """Sleep and poll for an instance's disk to sync.
1231 if not instance.disks:
1235 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1237 node = instance.primary_node
1239 for dev in instance.disks:
1240 lu.cfg.SetDiskID(dev, node)
1246 cumul_degraded = False
1247 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1249 lu.proc.LogWarning("Can't get any data from node %s" % node)
1252 raise errors.RemoteError("Can't contact node %s for mirror data,"
1253 " aborting." % node)
1257 for i in range(len(rstats)):
1260 lu.proc.LogWarning("Can't compute data for node %s/%s" %
1261 (node, instance.disks[i].iv_name))
1263 # we ignore the ldisk parameter
1264 perc_done, est_time, is_degraded, _ = mstat
1265 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1266 if perc_done is not None:
1268 if est_time is not None:
1269 rem_time = "%d estimated seconds remaining" % est_time
1272 rem_time = "no time estimate"
1273 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1274 (instance.disks[i].iv_name, perc_done, rem_time))
1278 time.sleep(min(60, max_time))
1281 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1282 return not cumul_degraded
1285 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1286 """Check that mirrors are not degraded.
1288 The ldisk parameter, if True, will change the test from the
1289 is_degraded attribute (which represents overall non-ok status for
1290 the device(s)) to the ldisk (representing the local storage status).
1293 lu.cfg.SetDiskID(dev, node)
1300 if on_primary or dev.AssembleOnSecondary():
1301 rstats = lu.rpc.call_blockdev_find(node, dev)
1303 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1306 result = result and (not rstats[idx])
1308 for child in dev.children:
1309 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1314 class LUDiagnoseOS(NoHooksLU):
1315 """Logical unit for OS diagnose/query.
1318 _OP_REQP = ["output_fields", "names"]
1321 def ExpandNames(self):
1323 raise errors.OpPrereqError("Selective OS query not supported")
1325 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1326 _CheckOutputFields(static=[],
1327 dynamic=self.dynamic_fields,
1328 selected=self.op.output_fields)
1330 # Lock all nodes, in shared mode
1331 self.needed_locks = {}
1332 self.share_locks[locking.LEVEL_NODE] = 1
1333 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1335 def CheckPrereq(self):
1336 """Check prerequisites.
1341 def _DiagnoseByOS(node_list, rlist):
1342 """Remaps a per-node return list into an a per-os per-node dictionary
1345 node_list: a list with the names of all nodes
1346 rlist: a map with node names as keys and OS objects as values
1349 map: a map with osnames as keys and as value another map, with
1351 keys and list of OS objects as values
1352 e.g. {"debian-etch": {"node1": [<object>,...],
1353 "node2": [<object>,]}
1358 for node_name, nr in rlist.iteritems():
1362 if os_obj.name not in all_os:
1363 # build a list of nodes for this os containing empty lists
1364 # for each node in node_list
1365 all_os[os_obj.name] = {}
1366 for nname in node_list:
1367 all_os[os_obj.name][nname] = []
1368 all_os[os_obj.name][node_name].append(os_obj)
1371 def Exec(self, feedback_fn):
1372 """Compute the list of OSes.
1375 node_list = self.acquired_locks[locking.LEVEL_NODE]
1376 node_data = self.rpc.call_os_diagnose(node_list)
1377 if node_data == False:
1378 raise errors.OpExecError("Can't gather the list of OSes")
1379 pol = self._DiagnoseByOS(node_list, node_data)
1381 for os_name, os_data in pol.iteritems():
1383 for field in self.op.output_fields:
1386 elif field == "valid":
1387 val = utils.all([osl and osl[0] for osl in os_data.values()])
1388 elif field == "node_status":
1390 for node_name, nos_list in os_data.iteritems():
1391 val[node_name] = [(v.status, v.path) for v in nos_list]
1393 raise errors.ParameterError(field)
1400 class LURemoveNode(LogicalUnit):
1401 """Logical unit for removing a node.
1404 HPATH = "node-remove"
1405 HTYPE = constants.HTYPE_NODE
1406 _OP_REQP = ["node_name"]
1408 def BuildHooksEnv(self):
1411 This doesn't run on the target node in the pre phase as a failed
1412 node would then be impossible to remove.
1416 "OP_TARGET": self.op.node_name,
1417 "NODE_NAME": self.op.node_name,
1419 all_nodes = self.cfg.GetNodeList()
1420 all_nodes.remove(self.op.node_name)
1421 return env, all_nodes, all_nodes
1423 def CheckPrereq(self):
1424 """Check prerequisites.
1427 - the node exists in the configuration
1428 - it does not have primary or secondary instances
1429 - it's not the master
1431 Any errors are signalled by raising errors.OpPrereqError.
1434 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1436 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1438 instance_list = self.cfg.GetInstanceList()
1440 masternode = self.cfg.GetMasterNode()
1441 if node.name == masternode:
1442 raise errors.OpPrereqError("Node is the master node,"
1443 " you need to failover first.")
1445 for instance_name in instance_list:
1446 instance = self.cfg.GetInstanceInfo(instance_name)
1447 if node.name == instance.primary_node:
1448 raise errors.OpPrereqError("Instance %s still running on the node,"
1449 " please remove first." % instance_name)
1450 if node.name in instance.secondary_nodes:
1451 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1452 " please remove first." % instance_name)
1453 self.op.node_name = node.name
1456 def Exec(self, feedback_fn):
1457 """Removes the node from the cluster.
1461 logger.Info("stopping the node daemon and removing configs from node %s" %
1464 self.context.RemoveNode(node.name)
1466 self.rpc.call_node_leave_cluster(node.name)
1469 class LUQueryNodes(NoHooksLU):
1470 """Logical unit for querying nodes.
1473 _OP_REQP = ["output_fields", "names"]
1476 def ExpandNames(self):
1477 self.dynamic_fields = frozenset([
1479 "mtotal", "mnode", "mfree",
1484 self.static_fields = frozenset([
1485 "name", "pinst_cnt", "sinst_cnt",
1486 "pinst_list", "sinst_list",
1487 "pip", "sip", "tags",
1491 _CheckOutputFields(static=self.static_fields,
1492 dynamic=self.dynamic_fields,
1493 selected=self.op.output_fields)
1495 self.needed_locks = {}
1496 self.share_locks[locking.LEVEL_NODE] = 1
1499 self.wanted = _GetWantedNodes(self, self.op.names)
1501 self.wanted = locking.ALL_SET
1503 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1505 # if we don't request only static fields, we need to lock the nodes
1506 self.needed_locks[locking.LEVEL_NODE] = self.wanted
1509 def CheckPrereq(self):
1510 """Check prerequisites.
1513 # The validation of the node list is done in the _GetWantedNodes,
1514 # if non empty, and if empty, there's no validation to do
1517 def Exec(self, feedback_fn):
1518 """Computes the list of nodes and their attributes.
1521 all_info = self.cfg.GetAllNodesInfo()
1523 nodenames = self.acquired_locks[locking.LEVEL_NODE]
1524 elif self.wanted != locking.ALL_SET:
1525 nodenames = self.wanted
1526 missing = set(nodenames).difference(all_info.keys())
1528 raise errors.OpExecError(
1529 "Some nodes were removed before retrieving their data: %s" % missing)
1531 nodenames = all_info.keys()
1533 nodenames = utils.NiceSort(nodenames)
1534 nodelist = [all_info[name] for name in nodenames]
1536 # begin data gathering
1538 if self.dynamic_fields.intersection(self.op.output_fields):
1540 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1541 self.cfg.GetHypervisorType())
1542 for name in nodenames:
1543 nodeinfo = node_data.get(name, None)
1546 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1547 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1548 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1549 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1550 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1551 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1552 "bootid": nodeinfo['bootid'],
1555 live_data[name] = {}
1557 live_data = dict.fromkeys(nodenames, {})
1559 node_to_primary = dict([(name, set()) for name in nodenames])
1560 node_to_secondary = dict([(name, set()) for name in nodenames])
1562 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1563 "sinst_cnt", "sinst_list"))
1564 if inst_fields & frozenset(self.op.output_fields):
1565 instancelist = self.cfg.GetInstanceList()
1567 for instance_name in instancelist:
1568 inst = self.cfg.GetInstanceInfo(instance_name)
1569 if inst.primary_node in node_to_primary:
1570 node_to_primary[inst.primary_node].add(inst.name)
1571 for secnode in inst.secondary_nodes:
1572 if secnode in node_to_secondary:
1573 node_to_secondary[secnode].add(inst.name)
1575 # end data gathering
1578 for node in nodelist:
1580 for field in self.op.output_fields:
1583 elif field == "pinst_list":
1584 val = list(node_to_primary[node.name])
1585 elif field == "sinst_list":
1586 val = list(node_to_secondary[node.name])
1587 elif field == "pinst_cnt":
1588 val = len(node_to_primary[node.name])
1589 elif field == "sinst_cnt":
1590 val = len(node_to_secondary[node.name])
1591 elif field == "pip":
1592 val = node.primary_ip
1593 elif field == "sip":
1594 val = node.secondary_ip
1595 elif field == "tags":
1596 val = list(node.GetTags())
1597 elif field == "serial_no":
1598 val = node.serial_no
1599 elif field in self.dynamic_fields:
1600 val = live_data[node.name].get(field, None)
1602 raise errors.ParameterError(field)
1603 node_output.append(val)
1604 output.append(node_output)
1609 class LUQueryNodeVolumes(NoHooksLU):
1610 """Logical unit for getting volumes on node(s).
1613 _OP_REQP = ["nodes", "output_fields"]
1616 def ExpandNames(self):
1617 _CheckOutputFields(static=["node"],
1618 dynamic=["phys", "vg", "name", "size", "instance"],
1619 selected=self.op.output_fields)
1621 self.needed_locks = {}
1622 self.share_locks[locking.LEVEL_NODE] = 1
1623 if not self.op.nodes:
1624 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1626 self.needed_locks[locking.LEVEL_NODE] = \
1627 _GetWantedNodes(self, self.op.nodes)
1629 def CheckPrereq(self):
1630 """Check prerequisites.
1632 This checks that the fields required are valid output fields.
1635 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1637 def Exec(self, feedback_fn):
1638 """Computes the list of nodes and their attributes.
1641 nodenames = self.nodes
1642 volumes = self.rpc.call_node_volumes(nodenames)
1644 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1645 in self.cfg.GetInstanceList()]
1647 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1650 for node in nodenames:
1651 if node not in volumes or not volumes[node]:
1654 node_vols = volumes[node][:]
1655 node_vols.sort(key=lambda vol: vol['dev'])
1657 for vol in node_vols:
1659 for field in self.op.output_fields:
1662 elif field == "phys":
1666 elif field == "name":
1668 elif field == "size":
1669 val = int(float(vol['size']))
1670 elif field == "instance":
1672 if node not in lv_by_node[inst]:
1674 if vol['name'] in lv_by_node[inst][node]:
1680 raise errors.ParameterError(field)
1681 node_output.append(str(val))
1683 output.append(node_output)
1688 class LUAddNode(LogicalUnit):
1689 """Logical unit for adding node to the cluster.
1693 HTYPE = constants.HTYPE_NODE
1694 _OP_REQP = ["node_name"]
1696 def BuildHooksEnv(self):
1699 This will run on all nodes before, and on all nodes + the new node after.
1703 "OP_TARGET": self.op.node_name,
1704 "NODE_NAME": self.op.node_name,
1705 "NODE_PIP": self.op.primary_ip,
1706 "NODE_SIP": self.op.secondary_ip,
1708 nodes_0 = self.cfg.GetNodeList()
1709 nodes_1 = nodes_0 + [self.op.node_name, ]
1710 return env, nodes_0, nodes_1
1712 def CheckPrereq(self):
1713 """Check prerequisites.
1716 - the new node is not already in the config
1718 - its parameters (single/dual homed) matches the cluster
1720 Any errors are signalled by raising errors.OpPrereqError.
1723 node_name = self.op.node_name
1726 dns_data = utils.HostInfo(node_name)
1728 node = dns_data.name
1729 primary_ip = self.op.primary_ip = dns_data.ip
1730 secondary_ip = getattr(self.op, "secondary_ip", None)
1731 if secondary_ip is None:
1732 secondary_ip = primary_ip
1733 if not utils.IsValidIP(secondary_ip):
1734 raise errors.OpPrereqError("Invalid secondary IP given")
1735 self.op.secondary_ip = secondary_ip
1737 node_list = cfg.GetNodeList()
1738 if not self.op.readd and node in node_list:
1739 raise errors.OpPrereqError("Node %s is already in the configuration" %
1741 elif self.op.readd and node not in node_list:
1742 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1744 for existing_node_name in node_list:
1745 existing_node = cfg.GetNodeInfo(existing_node_name)
1747 if self.op.readd and node == existing_node_name:
1748 if (existing_node.primary_ip != primary_ip or
1749 existing_node.secondary_ip != secondary_ip):
1750 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1751 " address configuration as before")
1754 if (existing_node.primary_ip == primary_ip or
1755 existing_node.secondary_ip == primary_ip or
1756 existing_node.primary_ip == secondary_ip or
1757 existing_node.secondary_ip == secondary_ip):
1758 raise errors.OpPrereqError("New node ip address(es) conflict with"
1759 " existing node %s" % existing_node.name)
1761 # check that the type of the node (single versus dual homed) is the
1762 # same as for the master
1763 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1764 master_singlehomed = myself.secondary_ip == myself.primary_ip
1765 newbie_singlehomed = secondary_ip == primary_ip
1766 if master_singlehomed != newbie_singlehomed:
1767 if master_singlehomed:
1768 raise errors.OpPrereqError("The master has no private ip but the"
1769 " new node has one")
1771 raise errors.OpPrereqError("The master has a private ip but the"
1772 " new node doesn't have one")
1774 # checks reachablity
1775 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1776 raise errors.OpPrereqError("Node not reachable by ping")
1778 if not newbie_singlehomed:
1779 # check reachability from my secondary ip to newbie's secondary ip
1780 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1781 source=myself.secondary_ip):
1782 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1783 " based ping to noded port")
1785 self.new_node = objects.Node(name=node,
1786 primary_ip=primary_ip,
1787 secondary_ip=secondary_ip)
1789 def Exec(self, feedback_fn):
1790 """Adds the new node to the cluster.
1793 new_node = self.new_node
1794 node = new_node.name
1796 # check connectivity
1797 result = self.rpc.call_version([node])[node]
1799 if constants.PROTOCOL_VERSION == result:
1800 logger.Info("communication to node %s fine, sw version %s match" %
1803 raise errors.OpExecError("Version mismatch master version %s,"
1804 " node version %s" %
1805 (constants.PROTOCOL_VERSION, result))
1807 raise errors.OpExecError("Cannot get version from the new node")
1810 logger.Info("copy ssh key to node %s" % node)
1811 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1813 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1814 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1820 keyarray.append(f.read())
1824 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1826 keyarray[3], keyarray[4], keyarray[5])
1829 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1831 # Add node to our /etc/hosts, and add key to known_hosts
1832 utils.AddHostToEtcHosts(new_node.name)
1834 if new_node.secondary_ip != new_node.primary_ip:
1835 if not self.rpc.call_node_has_ip_address(new_node.name,
1836 new_node.secondary_ip):
1837 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1838 " you gave (%s). Please fix and re-run this"
1839 " command." % new_node.secondary_ip)
1841 node_verify_list = [self.cfg.GetMasterNode()]
1842 node_verify_param = {
1844 # TODO: do a node-net-test as well?
1847 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1848 self.cfg.GetClusterName())
1849 for verifier in node_verify_list:
1850 if not result[verifier]:
1851 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1852 " for remote verification" % verifier)
1853 if result[verifier]['nodelist']:
1854 for failed in result[verifier]['nodelist']:
1855 feedback_fn("ssh/hostname verification failed %s -> %s" %
1856 (verifier, result[verifier]['nodelist'][failed]))
1857 raise errors.OpExecError("ssh/hostname verification failed.")
1859 # Distribute updated /etc/hosts and known_hosts to all nodes,
1860 # including the node just added
1861 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1862 dist_nodes = self.cfg.GetNodeList()
1863 if not self.op.readd:
1864 dist_nodes.append(node)
1865 if myself.name in dist_nodes:
1866 dist_nodes.remove(myself.name)
1868 logger.Debug("Copying hosts and known_hosts to all nodes")
1869 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1870 result = self.rpc.call_upload_file(dist_nodes, fname)
1871 for to_node in dist_nodes:
1872 if not result[to_node]:
1873 logger.Error("copy of file %s to node %s failed" %
1877 if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1878 to_copy.append(constants.VNC_PASSWORD_FILE)
1879 for fname in to_copy:
1880 result = self.rpc.call_upload_file([node], fname)
1881 if not result[node]:
1882 logger.Error("could not copy file %s to node %s" % (fname, node))
1885 self.context.ReaddNode(new_node)
1887 self.context.AddNode(new_node)
1890 class LUQueryClusterInfo(NoHooksLU):
1891 """Query cluster configuration.
1898 def ExpandNames(self):
1899 self.needed_locks = {}
1901 def CheckPrereq(self):
1902 """No prerequsites needed for this LU.
1907 def Exec(self, feedback_fn):
1908 """Return cluster config.
1911 cluster = self.cfg.GetClusterInfo()
1913 "software_version": constants.RELEASE_VERSION,
1914 "protocol_version": constants.PROTOCOL_VERSION,
1915 "config_version": constants.CONFIG_VERSION,
1916 "os_api_version": constants.OS_API_VERSION,
1917 "export_version": constants.EXPORT_VERSION,
1918 "architecture": (platform.architecture()[0], platform.machine()),
1919 "name": cluster.cluster_name,
1920 "master": cluster.master_node,
1921 "hypervisor_type": cluster.hypervisor,
1922 "enabled_hypervisors": cluster.enabled_hypervisors,
1923 "hvparams": cluster.hvparams,
1924 "beparams": cluster.beparams,
1930 class LUQueryConfigValues(NoHooksLU):
1931 """Return configuration values.
1937 def ExpandNames(self):
1938 self.needed_locks = {}
1940 static_fields = ["cluster_name", "master_node", "drain_flag"]
1941 _CheckOutputFields(static=static_fields,
1943 selected=self.op.output_fields)
1945 def CheckPrereq(self):
1946 """No prerequisites.
1951 def Exec(self, feedback_fn):
1952 """Dump a representation of the cluster config to the standard output.
1956 for field in self.op.output_fields:
1957 if field == "cluster_name":
1958 entry = self.cfg.GetClusterName()
1959 elif field == "master_node":
1960 entry = self.cfg.GetMasterNode()
1961 elif field == "drain_flag":
1962 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1964 raise errors.ParameterError(field)
1965 values.append(entry)
1969 class LUActivateInstanceDisks(NoHooksLU):
1970 """Bring up an instance's disks.
1973 _OP_REQP = ["instance_name"]
1976 def ExpandNames(self):
1977 self._ExpandAndLockInstance()
1978 self.needed_locks[locking.LEVEL_NODE] = []
1979 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1981 def DeclareLocks(self, level):
1982 if level == locking.LEVEL_NODE:
1983 self._LockInstancesNodes()
1985 def CheckPrereq(self):
1986 """Check prerequisites.
1988 This checks that the instance is in the cluster.
1991 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1992 assert self.instance is not None, \
1993 "Cannot retrieve locked instance %s" % self.op.instance_name
1995 def Exec(self, feedback_fn):
1996 """Activate the disks.
1999 disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2001 raise errors.OpExecError("Cannot activate block devices")
2006 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2007 """Prepare the block devices for an instance.
2009 This sets up the block devices on all nodes.
2012 instance: a ganeti.objects.Instance object
2013 ignore_secondaries: if true, errors on secondary nodes won't result
2014 in an error return from the function
2017 false if the operation failed
2018 list of (host, instance_visible_name, node_visible_name) if the operation
2019 suceeded with the mapping from node devices to instance devices
2023 iname = instance.name
2024 # With the two passes mechanism we try to reduce the window of
2025 # opportunity for the race condition of switching DRBD to primary
2026 # before handshaking occured, but we do not eliminate it
2028 # The proper fix would be to wait (with some limits) until the
2029 # connection has been made and drbd transitions from WFConnection
2030 # into any other network-connected state (Connected, SyncTarget,
2033 # 1st pass, assemble on all nodes in secondary mode
2034 for inst_disk in instance.disks:
2035 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2036 lu.cfg.SetDiskID(node_disk, node)
2037 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2039 logger.Error("could not prepare block device %s on node %s"
2040 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2041 if not ignore_secondaries:
2044 # FIXME: race condition on drbd migration to primary
2046 # 2nd pass, do only the primary node
2047 for inst_disk in instance.disks:
2048 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2049 if node != instance.primary_node:
2051 lu.cfg.SetDiskID(node_disk, node)
2052 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2054 logger.Error("could not prepare block device %s on node %s"
2055 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2057 device_info.append((instance.primary_node, inst_disk.iv_name, result))
2059 # leave the disks configured for the primary node
2060 # this is a workaround that would be fixed better by
2061 # improving the logical/physical id handling
2062 for disk in instance.disks:
2063 lu.cfg.SetDiskID(disk, instance.primary_node)
2065 return disks_ok, device_info
2068 def _StartInstanceDisks(lu, instance, force):
2069 """Start the disks of an instance.
2072 disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2073 ignore_secondaries=force)
2075 _ShutdownInstanceDisks(lu, instance)
2076 if force is not None and not force:
2077 logger.Error("If the message above refers to a secondary node,"
2078 " you can retry the operation using '--force'.")
2079 raise errors.OpExecError("Disk consistency error")
2082 class LUDeactivateInstanceDisks(NoHooksLU):
2083 """Shutdown an instance's disks.
2086 _OP_REQP = ["instance_name"]
2089 def ExpandNames(self):
2090 self._ExpandAndLockInstance()
2091 self.needed_locks[locking.LEVEL_NODE] = []
2092 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2094 def DeclareLocks(self, level):
2095 if level == locking.LEVEL_NODE:
2096 self._LockInstancesNodes()
2098 def CheckPrereq(self):
2099 """Check prerequisites.
2101 This checks that the instance is in the cluster.
2104 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2105 assert self.instance is not None, \
2106 "Cannot retrieve locked instance %s" % self.op.instance_name
2108 def Exec(self, feedback_fn):
2109 """Deactivate the disks
2112 instance = self.instance
2113 _SafeShutdownInstanceDisks(self, instance)
2116 def _SafeShutdownInstanceDisks(lu, instance):
2117 """Shutdown block devices of an instance.
2119 This function checks if an instance is running, before calling
2120 _ShutdownInstanceDisks.
2123 ins_l = lu.rpc.call_instance_list([instance.primary_node],
2124 [instance.hypervisor])
2125 ins_l = ins_l[instance.primary_node]
2126 if not type(ins_l) is list:
2127 raise errors.OpExecError("Can't contact node '%s'" %
2128 instance.primary_node)
2130 if instance.name in ins_l:
2131 raise errors.OpExecError("Instance is running, can't shutdown"
2134 _ShutdownInstanceDisks(lu, instance)
2137 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2138 """Shutdown block devices of an instance.
2140 This does the shutdown on all nodes of the instance.
2142 If the ignore_primary is false, errors on the primary node are
2147 for disk in instance.disks:
2148 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2149 lu.cfg.SetDiskID(top_disk, node)
2150 if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2151 logger.Error("could not shutdown block device %s on node %s" %
2152 (disk.iv_name, node))
2153 if not ignore_primary or node != instance.primary_node:
2158 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2159 """Checks if a node has enough free memory.
2161 This function check if a given node has the needed amount of free
2162 memory. In case the node has less memory or we cannot get the
2163 information from the node, this function raise an OpPrereqError
2166 @type lu: C{LogicalUnit}
2167 @param lu: a logical unit from which we get configuration data
2169 @param node: the node to check
2170 @type reason: C{str}
2171 @param reason: string to use in the error message
2172 @type requested: C{int}
2173 @param requested: the amount of memory in MiB to check for
2174 @type hypervisor: C{str}
2175 @param hypervisor: the hypervisor to ask for memory stats
2176 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2177 we cannot check the node
2180 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2181 if not nodeinfo or not isinstance(nodeinfo, dict):
2182 raise errors.OpPrereqError("Could not contact node %s for resource"
2183 " information" % (node,))
2185 free_mem = nodeinfo[node].get('memory_free')
2186 if not isinstance(free_mem, int):
2187 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2188 " was '%s'" % (node, free_mem))
2189 if requested > free_mem:
2190 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2191 " needed %s MiB, available %s MiB" %
2192 (node, reason, requested, free_mem))
2195 class LUStartupInstance(LogicalUnit):
2196 """Starts an instance.
2199 HPATH = "instance-start"
2200 HTYPE = constants.HTYPE_INSTANCE
2201 _OP_REQP = ["instance_name", "force"]
2204 def ExpandNames(self):
2205 self._ExpandAndLockInstance()
2206 self.needed_locks[locking.LEVEL_NODE] = []
2207 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2209 def DeclareLocks(self, level):
2210 if level == locking.LEVEL_NODE:
2211 self._LockInstancesNodes()
2213 def BuildHooksEnv(self):
2216 This runs on master, primary and secondary nodes of the instance.
2220 "FORCE": self.op.force,
2222 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2223 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2224 list(self.instance.secondary_nodes))
2227 def CheckPrereq(self):
2228 """Check prerequisites.
2230 This checks that the instance is in the cluster.
2233 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2234 assert self.instance is not None, \
2235 "Cannot retrieve locked instance %s" % self.op.instance_name
2237 bep = self.cfg.GetClusterInfo().FillBE(instance)
2238 # check bridges existance
2239 _CheckInstanceBridgesExist(self, instance)
2241 _CheckNodeFreeMemory(self, instance.primary_node,
2242 "starting instance %s" % instance.name,
2243 bep[constants.BE_MEMORY], instance.hypervisor)
2245 def Exec(self, feedback_fn):
2246 """Start the instance.
2249 instance = self.instance
2250 force = self.op.force
2251 extra_args = getattr(self.op, "extra_args", "")
2253 self.cfg.MarkInstanceUp(instance.name)
2255 node_current = instance.primary_node
2257 _StartInstanceDisks(self, instance, force)
2259 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2260 _ShutdownInstanceDisks(self, instance)
2261 raise errors.OpExecError("Could not start instance")
2264 class LURebootInstance(LogicalUnit):
2265 """Reboot an instance.
2268 HPATH = "instance-reboot"
2269 HTYPE = constants.HTYPE_INSTANCE
2270 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2273 def ExpandNames(self):
2274 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2275 constants.INSTANCE_REBOOT_HARD,
2276 constants.INSTANCE_REBOOT_FULL]:
2277 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2278 (constants.INSTANCE_REBOOT_SOFT,
2279 constants.INSTANCE_REBOOT_HARD,
2280 constants.INSTANCE_REBOOT_FULL))
2281 self._ExpandAndLockInstance()
2282 self.needed_locks[locking.LEVEL_NODE] = []
2283 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2285 def DeclareLocks(self, level):
2286 if level == locking.LEVEL_NODE:
2287 primary_only = not constants.INSTANCE_REBOOT_FULL
2288 self._LockInstancesNodes(primary_only=primary_only)
2290 def BuildHooksEnv(self):
2293 This runs on master, primary and secondary nodes of the instance.
2297 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2299 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2300 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2301 list(self.instance.secondary_nodes))
2304 def CheckPrereq(self):
2305 """Check prerequisites.
2307 This checks that the instance is in the cluster.
2310 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2311 assert self.instance is not None, \
2312 "Cannot retrieve locked instance %s" % self.op.instance_name
2314 # check bridges existance
2315 _CheckInstanceBridgesExist(self, instance)
2317 def Exec(self, feedback_fn):
2318 """Reboot the instance.
2321 instance = self.instance
2322 ignore_secondaries = self.op.ignore_secondaries
2323 reboot_type = self.op.reboot_type
2324 extra_args = getattr(self.op, "extra_args", "")
2326 node_current = instance.primary_node
2328 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2329 constants.INSTANCE_REBOOT_HARD]:
2330 if not self.rpc.call_instance_reboot(node_current, instance,
2331 reboot_type, extra_args):
2332 raise errors.OpExecError("Could not reboot instance")
2334 if not self.rpc.call_instance_shutdown(node_current, instance):
2335 raise errors.OpExecError("could not shutdown instance for full reboot")
2336 _ShutdownInstanceDisks(self, instance)
2337 _StartInstanceDisks(self, instance, ignore_secondaries)
2338 if not self.rpc.call_instance_start(node_current, instance, extra_args):
2339 _ShutdownInstanceDisks(self, instance)
2340 raise errors.OpExecError("Could not start instance for full reboot")
2342 self.cfg.MarkInstanceUp(instance.name)
2345 class LUShutdownInstance(LogicalUnit):
2346 """Shutdown an instance.
2349 HPATH = "instance-stop"
2350 HTYPE = constants.HTYPE_INSTANCE
2351 _OP_REQP = ["instance_name"]
2354 def ExpandNames(self):
2355 self._ExpandAndLockInstance()
2356 self.needed_locks[locking.LEVEL_NODE] = []
2357 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2359 def DeclareLocks(self, level):
2360 if level == locking.LEVEL_NODE:
2361 self._LockInstancesNodes()
2363 def BuildHooksEnv(self):
2366 This runs on master, primary and secondary nodes of the instance.
2369 env = _BuildInstanceHookEnvByObject(self, self.instance)
2370 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2371 list(self.instance.secondary_nodes))
2374 def CheckPrereq(self):
2375 """Check prerequisites.
2377 This checks that the instance is in the cluster.
2380 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2381 assert self.instance is not None, \
2382 "Cannot retrieve locked instance %s" % self.op.instance_name
2384 def Exec(self, feedback_fn):
2385 """Shutdown the instance.
2388 instance = self.instance
2389 node_current = instance.primary_node
2390 self.cfg.MarkInstanceDown(instance.name)
2391 if not self.rpc.call_instance_shutdown(node_current, instance):
2392 logger.Error("could not shutdown instance")
2394 _ShutdownInstanceDisks(self, instance)
2397 class LUReinstallInstance(LogicalUnit):
2398 """Reinstall an instance.
2401 HPATH = "instance-reinstall"
2402 HTYPE = constants.HTYPE_INSTANCE
2403 _OP_REQP = ["instance_name"]
2406 def ExpandNames(self):
2407 self._ExpandAndLockInstance()
2408 self.needed_locks[locking.LEVEL_NODE] = []
2409 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2411 def DeclareLocks(self, level):
2412 if level == locking.LEVEL_NODE:
2413 self._LockInstancesNodes()
2415 def BuildHooksEnv(self):
2418 This runs on master, primary and secondary nodes of the instance.
2421 env = _BuildInstanceHookEnvByObject(self, self.instance)
2422 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2423 list(self.instance.secondary_nodes))
2426 def CheckPrereq(self):
2427 """Check prerequisites.
2429 This checks that the instance is in the cluster and is not running.
2432 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2433 assert instance is not None, \
2434 "Cannot retrieve locked instance %s" % self.op.instance_name
2436 if instance.disk_template == constants.DT_DISKLESS:
2437 raise errors.OpPrereqError("Instance '%s' has no disks" %
2438 self.op.instance_name)
2439 if instance.status != "down":
2440 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2441 self.op.instance_name)
2442 remote_info = self.rpc.call_instance_info(instance.primary_node,
2444 instance.hypervisor)
2446 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2447 (self.op.instance_name,
2448 instance.primary_node))
2450 self.op.os_type = getattr(self.op, "os_type", None)
2451 if self.op.os_type is not None:
2453 pnode = self.cfg.GetNodeInfo(
2454 self.cfg.ExpandNodeName(instance.primary_node))
2456 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2458 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2460 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2461 " primary node" % self.op.os_type)
2463 self.instance = instance
2465 def Exec(self, feedback_fn):
2466 """Reinstall the instance.
2469 inst = self.instance
2471 if self.op.os_type is not None:
2472 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2473 inst.os = self.op.os_type
2474 self.cfg.Update(inst)
2476 _StartInstanceDisks(self, inst, None)
2478 feedback_fn("Running the instance OS create scripts...")
2479 if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2481 raise errors.OpExecError("Could not install OS for instance %s"
2483 (inst.name, inst.primary_node))
2485 _ShutdownInstanceDisks(self, inst)
2488 class LURenameInstance(LogicalUnit):
2489 """Rename an instance.
2492 HPATH = "instance-rename"
2493 HTYPE = constants.HTYPE_INSTANCE
2494 _OP_REQP = ["instance_name", "new_name"]
2496 def BuildHooksEnv(self):
2499 This runs on master, primary and secondary nodes of the instance.
2502 env = _BuildInstanceHookEnvByObject(self, self.instance)
2503 env["INSTANCE_NEW_NAME"] = self.op.new_name
2504 nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2505 list(self.instance.secondary_nodes))
2508 def CheckPrereq(self):
2509 """Check prerequisites.
2511 This checks that the instance is in the cluster and is not running.
2514 instance = self.cfg.GetInstanceInfo(
2515 self.cfg.ExpandInstanceName(self.op.instance_name))
2516 if instance is None:
2517 raise errors.OpPrereqError("Instance '%s' not known" %
2518 self.op.instance_name)
2519 if instance.status != "down":
2520 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2521 self.op.instance_name)
2522 remote_info = self.rpc.call_instance_info(instance.primary_node,
2524 instance.hypervisor)
2526 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2527 (self.op.instance_name,
2528 instance.primary_node))
2529 self.instance = instance
2531 # new name verification
2532 name_info = utils.HostInfo(self.op.new_name)
2534 self.op.new_name = new_name = name_info.name
2535 instance_list = self.cfg.GetInstanceList()
2536 if new_name in instance_list:
2537 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2540 if not getattr(self.op, "ignore_ip", False):
2541 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2542 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2543 (name_info.ip, new_name))
2546 def Exec(self, feedback_fn):
2547 """Reinstall the instance.
2550 inst = self.instance
2551 old_name = inst.name
2553 if inst.disk_template == constants.DT_FILE:
2554 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2556 self.cfg.RenameInstance(inst.name, self.op.new_name)
2557 # Change the instance lock. This is definitely safe while we hold the BGL
2558 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2559 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2561 # re-read the instance from the configuration after rename
2562 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2564 if inst.disk_template == constants.DT_FILE:
2565 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2566 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2567 old_file_storage_dir,
2568 new_file_storage_dir)
2571 raise errors.OpExecError("Could not connect to node '%s' to rename"
2572 " directory '%s' to '%s' (but the instance"
2573 " has been renamed in Ganeti)" % (
2574 inst.primary_node, old_file_storage_dir,
2575 new_file_storage_dir))
2578 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2579 " (but the instance has been renamed in"
2580 " Ganeti)" % (old_file_storage_dir,
2581 new_file_storage_dir))
2583 _StartInstanceDisks(self, inst, None)
2585 if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2588 msg = ("Could not run OS rename script for instance %s on node %s"
2589 " (but the instance has been renamed in Ganeti)" %
2590 (inst.name, inst.primary_node))
2593 _ShutdownInstanceDisks(self, inst)
2596 class LURemoveInstance(LogicalUnit):
2597 """Remove an instance.
2600 HPATH = "instance-remove"
2601 HTYPE = constants.HTYPE_INSTANCE
2602 _OP_REQP = ["instance_name", "ignore_failures"]
2605 def ExpandNames(self):
2606 self._ExpandAndLockInstance()
2607 self.needed_locks[locking.LEVEL_NODE] = []
2608 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2610 def DeclareLocks(self, level):
2611 if level == locking.LEVEL_NODE:
2612 self._LockInstancesNodes()
2614 def BuildHooksEnv(self):
2617 This runs on master, primary and secondary nodes of the instance.
2620 env = _BuildInstanceHookEnvByObject(self, self.instance)
2621 nl = [self.cfg.GetMasterNode()]
2624 def CheckPrereq(self):
2625 """Check prerequisites.
2627 This checks that the instance is in the cluster.
2630 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2631 assert self.instance is not None, \
2632 "Cannot retrieve locked instance %s" % self.op.instance_name
2634 def Exec(self, feedback_fn):
2635 """Remove the instance.
2638 instance = self.instance
2639 logger.Info("shutting down instance %s on node %s" %
2640 (instance.name, instance.primary_node))
2642 if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2643 if self.op.ignore_failures:
2644 feedback_fn("Warning: can't shutdown instance")
2646 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2647 (instance.name, instance.primary_node))
2649 logger.Info("removing block devices for instance %s" % instance.name)
2651 if not _RemoveDisks(self, instance):
2652 if self.op.ignore_failures:
2653 feedback_fn("Warning: can't remove instance's disks")
2655 raise errors.OpExecError("Can't remove instance's disks")
2657 logger.Info("removing instance %s out of cluster config" % instance.name)
2659 self.cfg.RemoveInstance(instance.name)
2660 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2663 class LUQueryInstances(NoHooksLU):
2664 """Logical unit for querying instances.
2667 _OP_REQP = ["output_fields", "names"]
2670 def ExpandNames(self):
2671 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2672 hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2673 bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2674 self.static_fields = frozenset([
2675 "name", "os", "pnode", "snodes",
2676 "admin_state", "admin_ram",
2677 "disk_template", "ip", "mac", "bridge",
2678 "sda_size", "sdb_size", "vcpus", "tags",
2680 "serial_no", "hypervisor", "hvparams",
2683 _CheckOutputFields(static=self.static_fields,
2684 dynamic=self.dynamic_fields,
2685 selected=self.op.output_fields)
2687 self.needed_locks = {}
2688 self.share_locks[locking.LEVEL_INSTANCE] = 1
2689 self.share_locks[locking.LEVEL_NODE] = 1
2692 self.wanted = _GetWantedInstances(self, self.op.names)
2694 self.wanted = locking.ALL_SET
2696 self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2698 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2699 self.needed_locks[locking.LEVEL_NODE] = []
2700 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2702 def DeclareLocks(self, level):
2703 if level == locking.LEVEL_NODE and self.do_locking:
2704 self._LockInstancesNodes()
2706 def CheckPrereq(self):
2707 """Check prerequisites.
2712 def Exec(self, feedback_fn):
2713 """Computes the list of nodes and their attributes.
2716 all_info = self.cfg.GetAllInstancesInfo()
2718 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2719 elif self.wanted != locking.ALL_SET:
2720 instance_names = self.wanted
2721 missing = set(instance_names).difference(all_info.keys())
2723 raise errors.OpExecError(
2724 "Some instances were removed before retrieving their data: %s"
2727 instance_names = all_info.keys()
2729 instance_names = utils.NiceSort(instance_names)
2730 instance_list = [all_info[iname] for iname in instance_names]
2732 # begin data gathering
2734 nodes = frozenset([inst.primary_node for inst in instance_list])
2735 hv_list = list(set([inst.hypervisor for inst in instance_list]))
2738 if self.dynamic_fields.intersection(self.op.output_fields):
2740 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2742 result = node_data[name]
2744 live_data.update(result)
2745 elif result == False:
2746 bad_nodes.append(name)
2747 # else no instance is alive
2749 live_data = dict([(name, {}) for name in instance_names])
2751 # end data gathering
2756 for instance in instance_list:
2758 i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2759 i_be = self.cfg.GetClusterInfo().FillBE(instance)
2760 for field in self.op.output_fields:
2765 elif field == "pnode":
2766 val = instance.primary_node
2767 elif field == "snodes":
2768 val = list(instance.secondary_nodes)
2769 elif field == "admin_state":
2770 val = (instance.status != "down")
2771 elif field == "oper_state":
2772 if instance.primary_node in bad_nodes:
2775 val = bool(live_data.get(instance.name))
2776 elif field == "status":
2777 if instance.primary_node in bad_nodes:
2778 val = "ERROR_nodedown"
2780 running = bool(live_data.get(instance.name))
2782 if instance.status != "down":
2787 if instance.status != "down":
2791 elif field == "oper_ram":
2792 if instance.primary_node in bad_nodes:
2794 elif instance.name in live_data:
2795 val = live_data[instance.name].get("memory", "?")
2798 elif field == "disk_template":
2799 val = instance.disk_template
2801 val = instance.nics[0].ip
2802 elif field == "bridge":
2803 val = instance.nics[0].bridge
2804 elif field == "mac":
2805 val = instance.nics[0].mac
2806 elif field == "sda_size" or field == "sdb_size":
2807 disk = instance.FindDisk(field[:3])
2812 elif field == "tags":
2813 val = list(instance.GetTags())
2814 elif field == "serial_no":
2815 val = instance.serial_no
2816 elif field == "network_port":
2817 val = instance.network_port
2818 elif field == "hypervisor":
2819 val = instance.hypervisor
2820 elif field == "hvparams":
2822 elif (field.startswith(HVPREFIX) and
2823 field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2824 val = i_hv.get(field[len(HVPREFIX):], None)
2825 elif field == "beparams":
2827 elif (field.startswith(BEPREFIX) and
2828 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2829 val = i_be.get(field[len(BEPREFIX):], None)
2831 raise errors.ParameterError(field)
2838 class LUFailoverInstance(LogicalUnit):
2839 """Failover an instance.
2842 HPATH = "instance-failover"
2843 HTYPE = constants.HTYPE_INSTANCE
2844 _OP_REQP = ["instance_name", "ignore_consistency"]
2847 def ExpandNames(self):
2848 self._ExpandAndLockInstance()
2849 self.needed_locks[locking.LEVEL_NODE] = []
2850 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2852 def DeclareLocks(self, level):
2853 if level == locking.LEVEL_NODE:
2854 self._LockInstancesNodes()
2856 def BuildHooksEnv(self):
2859 This runs on master, primary and secondary nodes of the instance.
2863 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2865 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2866 nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2869 def CheckPrereq(self):
2870 """Check prerequisites.
2872 This checks that the instance is in the cluster.
2875 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2876 assert self.instance is not None, \
2877 "Cannot retrieve locked instance %s" % self.op.instance_name
2879 bep = self.cfg.GetClusterInfo().FillBE(instance)
2880 if instance.disk_template not in constants.DTS_NET_MIRROR:
2881 raise errors.OpPrereqError("Instance's disk layout is not"
2882 " network mirrored, cannot failover.")
2884 secondary_nodes = instance.secondary_nodes
2885 if not secondary_nodes:
2886 raise errors.ProgrammerError("no secondary node but using "
2887 "a mirrored disk template")
2889 target_node = secondary_nodes[0]
2890 # check memory requirements on the secondary node
2891 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2892 instance.name, bep[constants.BE_MEMORY],
2893 instance.hypervisor)
2895 # check bridge existance
2896 brlist = [nic.bridge for nic in instance.nics]
2897 if not self.rpc.call_bridges_exist(target_node, brlist):
2898 raise errors.OpPrereqError("One or more target bridges %s does not"
2899 " exist on destination node '%s'" %
2900 (brlist, target_node))
2902 def Exec(self, feedback_fn):
2903 """Failover an instance.
2905 The failover is done by shutting it down on its present node and
2906 starting it on the secondary.
2909 instance = self.instance
2911 source_node = instance.primary_node
2912 target_node = instance.secondary_nodes[0]
2914 feedback_fn("* checking disk consistency between source and target")
2915 for dev in instance.disks:
2916 # for drbd, these are drbd over lvm
2917 if not _CheckDiskConsistency(self, dev, target_node, False):
2918 if instance.status == "up" and not self.op.ignore_consistency:
2919 raise errors.OpExecError("Disk %s is degraded on target node,"
2920 " aborting failover." % dev.iv_name)
2922 feedback_fn("* shutting down instance on source node")
2923 logger.Info("Shutting down instance %s on node %s" %
2924 (instance.name, source_node))
2926 if not self.rpc.call_instance_shutdown(source_node, instance):
2927 if self.op.ignore_consistency:
2928 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2929 " anyway. Please make sure node %s is down" %
2930 (instance.name, source_node, source_node))
2932 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2933 (instance.name, source_node))
2935 feedback_fn("* deactivating the instance's disks on source node")
2936 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2937 raise errors.OpExecError("Can't shut down the instance's disks.")
2939 instance.primary_node = target_node
2940 # distribute new instance config to the other nodes
2941 self.cfg.Update(instance)
2943 # Only start the instance if it's marked as up
2944 if instance.status == "up":
2945 feedback_fn("* activating the instance's disks on target node")
2946 logger.Info("Starting instance %s on node %s" %
2947 (instance.name, target_node))
2949 disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2950 ignore_secondaries=True)
2952 _ShutdownInstanceDisks(self, instance)
2953 raise errors.OpExecError("Can't activate the instance's disks")
2955 feedback_fn("* starting the instance on the target node")
2956 if not self.rpc.call_instance_start(target_node, instance, None):
2957 _ShutdownInstanceDisks(self, instance)
2958 raise errors.OpExecError("Could not start instance %s on node %s." %
2959 (instance.name, target_node))
2962 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2963 """Create a tree of block devices on the primary node.
2965 This always creates all devices.
2969 for child in device.children:
2970 if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2973 lu.cfg.SetDiskID(device, node)
2974 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2975 instance.name, True, info)
2978 if device.physical_id is None:
2979 device.physical_id = new_id
2983 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2984 """Create a tree of block devices on a secondary node.
2986 If this device type has to be created on secondaries, create it and
2989 If not, just recurse to children keeping the same 'force' value.
2992 if device.CreateOnSecondary():
2995 for child in device.children:
2996 if not _CreateBlockDevOnSecondary(lu, node, instance,
2997 child, force, info):
3002 lu.cfg.SetDiskID(device, node)
3003 new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3004 instance.name, False, info)
3007 if device.physical_id is None:
3008 device.physical_id = new_id
3012 def _GenerateUniqueNames(lu, exts):
3013 """Generate a suitable LV name.
3015 This will generate a logical volume name for the given instance.
3020 new_id = lu.cfg.GenerateUniqueID()
3021 results.append("%s%s" % (new_id, val))
3025 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3027 """Generate a drbd8 device complete with its children.
3030 port = lu.cfg.AllocatePort()
3031 vgname = lu.cfg.GetVGName()
3032 shared_secret = lu.cfg.GenerateDRBDSecret()
3033 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3034 logical_id=(vgname, names[0]))
3035 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3036 logical_id=(vgname, names[1]))
3037 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3038 logical_id=(primary, secondary, port,
3041 children=[dev_data, dev_meta],
3046 def _GenerateDiskTemplate(lu, template_name,
3047 instance_name, primary_node,
3048 secondary_nodes, disk_sz, swap_sz,
3049 file_storage_dir, file_driver):
3050 """Generate the entire disk layout for a given template type.
3053 #TODO: compute space requirements
3055 vgname = lu.cfg.GetVGName()
3056 if template_name == constants.DT_DISKLESS:
3058 elif template_name == constants.DT_PLAIN:
3059 if len(secondary_nodes) != 0:
3060 raise errors.ProgrammerError("Wrong template configuration")
3062 names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3063 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3064 logical_id=(vgname, names[0]),
3066 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3067 logical_id=(vgname, names[1]),
3069 disks = [sda_dev, sdb_dev]
3070 elif template_name == constants.DT_DRBD8:
3071 if len(secondary_nodes) != 1:
3072 raise errors.ProgrammerError("Wrong template configuration")
3073 remote_node = secondary_nodes[0]
3074 (minor_pa, minor_pb,
3075 minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3076 [primary_node, primary_node, remote_node, remote_node], instance_name)
3078 names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3079 ".sdb_data", ".sdb_meta"])
3080 drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3081 disk_sz, names[0:2], "sda",
3083 drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3084 swap_sz, names[2:4], "sdb",
3086 disks = [drbd_sda_dev, drbd_sdb_dev]
3087 elif template_name == constants.DT_FILE:
3088 if len(secondary_nodes) != 0:
3089 raise errors.ProgrammerError("Wrong template configuration")
3091 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3092 iv_name="sda", logical_id=(file_driver,
3093 "%s/sda" % file_storage_dir))
3094 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3095 iv_name="sdb", logical_id=(file_driver,
3096 "%s/sdb" % file_storage_dir))
3097 disks = [file_sda_dev, file_sdb_dev]
3099 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3103 def _GetInstanceInfoText(instance):
3104 """Compute that text that should be added to the disk's metadata.
3107 return "originstname+%s" % instance.name
3110 def _CreateDisks(lu, instance):
3111 """Create all disks for an instance.
3113 This abstracts away some work from AddInstance.
3116 instance: the instance object
3119 True or False showing the success of the creation process
3122 info = _GetInstanceInfoText(instance)
3124 if instance.disk_template == constants.DT_FILE:
3125 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3126 result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3130 logger.Error("Could not connect to node '%s'" % instance.primary_node)
3134 logger.Error("failed to create directory '%s'" % file_storage_dir)
3137 for device in instance.disks:
3138 logger.Info("creating volume %s for instance %s" %
3139 (device.iv_name, instance.name))
3141 for secondary_node in instance.secondary_nodes:
3142 if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3143 device, False, info):
3144 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3145 (device.iv_name, device, secondary_node))
3148 if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3149 instance, device, info):
3150 logger.Error("failed to create volume %s on primary!" %
3157 def _RemoveDisks(lu, instance):
3158 """Remove all disks for an instance.
3160 This abstracts away some work from `AddInstance()` and
3161 `RemoveInstance()`. Note that in case some of the devices couldn't
3162 be removed, the removal will continue with the other ones (compare
3163 with `_CreateDisks()`).
3166 instance: the instance object
3169 True or False showing the success of the removal proces
3172 logger.Info("removing block devices for instance %s" % instance.name)
3175 for device in instance.disks:
3176 for node, disk in device.ComputeNodeTree(instance.primary_node):
3177 lu.cfg.SetDiskID(disk, node)
3178 if not lu.rpc.call_blockdev_remove(node, disk):
3179 logger.Error("could not remove block device %s on node %s,"
3180 " continuing anyway" %
3181 (device.iv_name, node))
3184 if instance.disk_template == constants.DT_FILE:
3185 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3186 if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3188 logger.Error("could not remove directory '%s'" % file_storage_dir)
3194 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3195 """Compute disk size requirements in the volume group
3197 This is currently hard-coded for the two-drive layout.
3200 # Required free disk space as a function of disk and swap space
3202 constants.DT_DISKLESS: None,
3203 constants.DT_PLAIN: disk_size + swap_size,
3204 # 256 MB are added for drbd metadata, 128MB for each drbd device
3205 constants.DT_DRBD8: disk_size + swap_size + 256,
3206 constants.DT_FILE: None,
3209 if disk_template not in req_size_dict:
3210 raise errors.ProgrammerError("Disk template '%s' size requirement"
3211 " is unknown" % disk_template)
3213 return req_size_dict[disk_template]
3216 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3217 """Hypervisor parameter validation.
3219 This function abstract the hypervisor parameter validation to be
3220 used in both instance create and instance modify.
3222 @type lu: L{LogicalUnit}
3223 @param lu: the logical unit for which we check
3224 @type nodenames: list
3225 @param nodenames: the list of nodes on which we should check
3226 @type hvname: string
3227 @param hvname: the name of the hypervisor we should use
3228 @type hvparams: dict
3229 @param hvparams: the parameters which we need to check
3230 @raise errors.OpPrereqError: if the parameters are not valid
3233 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3236 for node in nodenames:
3237 info = hvinfo.get(node, None)
3238 if not info or not isinstance(info, (tuple, list)):
3239 raise errors.OpPrereqError("Cannot get current information"
3240 " from node '%s' (%s)" % (node, info))
3242 raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3246 class LUCreateInstance(LogicalUnit):
3247 """Create an instance.
3250 HPATH = "instance-add"
3251 HTYPE = constants.HTYPE_INSTANCE
3252 _OP_REQP = ["instance_name", "disk_size",
3253 "disk_template", "swap_size", "mode", "start",
3254 "wait_for_sync", "ip_check", "mac",
3255 "hvparams", "beparams"]
3258 def _ExpandNode(self, node):
3259 """Expands and checks one node name.
3262 node_full = self.cfg.ExpandNodeName(node)
3263 if node_full is None:
3264 raise errors.OpPrereqError("Unknown node %s" % node)
3267 def ExpandNames(self):
3268 """ExpandNames for CreateInstance.
3270 Figure out the right locks for instance creation.
3273 self.needed_locks = {}
3275 # set optional parameters to none if they don't exist
3276 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3277 if not hasattr(self.op, attr):
3278 setattr(self.op, attr, None)
3280 # cheap checks, mostly valid constants given
3282 # verify creation mode
3283 if self.op.mode not in (constants.INSTANCE_CREATE,
3284 constants.INSTANCE_IMPORT):
3285 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3288 # disk template and mirror node verification
3289 if self.op.disk_template not in constants.DISK_TEMPLATES:
3290 raise errors.OpPrereqError("Invalid disk template name")
3292 if self.op.hypervisor is None:
3293 self.op.hypervisor = self.cfg.GetHypervisorType()
3295 cluster = self.cfg.GetClusterInfo()
3296 enabled_hvs = cluster.enabled_hypervisors
3297 if self.op.hypervisor not in enabled_hvs:
3298 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3299 " cluster (%s)" % (self.op.hypervisor,
3300 ",".join(enabled_hvs)))
3302 # check hypervisor parameter syntax (locally)
3304 filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3306 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3307 hv_type.CheckParameterSyntax(filled_hvp)
3309 # fill and remember the beparams dict
3310 self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3313 #### instance parameters check
3315 # instance name verification
3316 hostname1 = utils.HostInfo(self.op.instance_name)
3317 self.op.instance_name = instance_name = hostname1.name
3319 # this is just a preventive check, but someone might still add this
3320 # instance in the meantime, and creation will fail at lock-add time
3321 if instance_name in self.cfg.GetInstanceList():
3322 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3325 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3327 # ip validity checks
3328 ip = getattr(self.op, "ip", None)
3329 if ip is None or ip.lower() == "none":
3331 elif ip.lower() == "auto":
3332 inst_ip = hostname1.ip
3334 if not utils.IsValidIP(ip):
3335 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3336 " like a valid IP" % ip)
3338 self.inst_ip = self.op.ip = inst_ip
3339 # used in CheckPrereq for ip ping check
3340 self.check_ip = hostname1.ip
3342 # MAC address verification
3343 if self.op.mac != "auto":
3344 if not utils.IsValidMac(self.op.mac.lower()):
3345 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3348 # file storage checks
3349 if (self.op.file_driver and
3350 not self.op.file_driver in constants.FILE_DRIVER):
3351 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3352 self.op.file_driver)
3354 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3355 raise errors.OpPrereqError("File storage directory path not absolute")
3357 ### Node/iallocator related checks
3358 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3359 raise errors.OpPrereqError("One and only one of iallocator and primary"
3360 " node must be given")
3362 if self.op.iallocator:
3363 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3365 self.op.pnode = self._ExpandNode(self.op.pnode)
3366 nodelist = [self.op.pnode]
3367 if self.op.snode is not None:
3368 self.op.snode = self._ExpandNode(self.op.snode)
3369 nodelist.append(self.op.snode)
3370 self.needed_locks[locking.LEVEL_NODE] = nodelist
3372 # in case of import lock the source node too
3373 if self.op.mode == constants.INSTANCE_IMPORT:
3374 src_node = getattr(self.op, "src_node", None)
3375 src_path = getattr(self.op, "src_path", None)
3377 if src_node is None or src_path is None:
3378 raise errors.OpPrereqError("Importing an instance requires source"
3379 " node and path options")
3381 if not os.path.isabs(src_path):
3382 raise errors.OpPrereqError("The source path must be absolute")
3384 self.op.src_node = src_node = self._ExpandNode(src_node)
3385 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3386 self.needed_locks[locking.LEVEL_NODE].append(src_node)
3388 else: # INSTANCE_CREATE
3389 if getattr(self.op, "os_type", None) is None:
3390 raise errors.OpPrereqError("No guest OS specified")
3392 def _RunAllocator(self):
3393 """Run the allocator based on input opcode.
3396 disks = [{"size": self.op.disk_size, "mode": "w"},
3397 {"size": self.op.swap_size, "mode": "w"}]
3398 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3399 "bridge": self.op.bridge}]
3400 ial = IAllocator(self,
3401 mode=constants.IALLOCATOR_MODE_ALLOC,
3402 name=self.op.instance_name,
3403 disk_template=self.op.disk_template,
3406 vcpus=self.be_full[constants.BE_VCPUS],
3407 mem_size=self.be_full[constants.BE_MEMORY],
3412 ial.Run(self.op.iallocator)
3415 raise errors.OpPrereqError("Can't compute nodes using"
3416 " iallocator '%s': %s" % (self.op.iallocator,
3418 if len(ial.nodes) != ial.required_nodes:
3419 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3420 " of nodes (%s), required %s" %
3421 (self.op.iallocator, len(ial.nodes),
3422 ial.required_nodes))
3423 self.op.pnode = ial.nodes[0]
3424 logger.ToStdout("Selected nodes for the instance: %s" %
3425 (", ".join(ial.nodes),))
3426 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3427 (self.op.instance_name, self.op.iallocator, ial.nodes))
3428 if ial.required_nodes == 2:
3429 self.op.snode = ial.nodes[1]
3431 def BuildHooksEnv(self):
3434 This runs on master, primary and secondary nodes of the instance.
3438 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3439 "INSTANCE_DISK_SIZE": self.op.disk_size,
3440 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3441 "INSTANCE_ADD_MODE": self.op.mode,
3443 if self.op.mode == constants.INSTANCE_IMPORT:
3444 env["INSTANCE_SRC_NODE"] = self.op.src_node
3445 env["INSTANCE_SRC_PATH"] = self.op.src_path
3446 env["INSTANCE_SRC_IMAGE"] = self.src_image
3448 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3449 primary_node=self.op.pnode,
3450 secondary_nodes=self.secondaries,
3451 status=self.instance_status,
3452 os_type=self.op.os_type,
3453 memory=self.be_full[constants.BE_MEMORY],
3454 vcpus=self.be_full[constants.BE_VCPUS],
3455 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3458 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3463 def CheckPrereq(self):
3464 """Check prerequisites.
3467 if (not self.cfg.GetVGName() and
3468 self.op.disk_template not in constants.DTS_NOT_LVM):
3469 raise errors.OpPrereqError("Cluster does not support lvm-based"
3473 if self.op.mode == constants.INSTANCE_IMPORT:
3474 src_node = self.op.src_node
3475 src_path = self.op.src_path
3477 export_info = self.rpc.call_export_info(src_node, src_path)
3480 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3482 if not export_info.has_section(constants.INISECT_EXP):
3483 raise errors.ProgrammerError("Corrupted export config")
3485 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3486 if (int(ei_version) != constants.EXPORT_VERSION):
3487 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3488 (ei_version, constants.EXPORT_VERSION))
3490 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3491 raise errors.OpPrereqError("Can't import instance with more than"
3494 # FIXME: are the old os-es, disk sizes, etc. useful?
3495 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3496 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3498 self.src_image = diskimage
3500 # ip ping checks (we use the same ip that was resolved in ExpandNames)
3502 if self.op.start and not self.op.ip_check:
3503 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3504 " adding an instance in start mode")
3506 if self.op.ip_check:
3507 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3508 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3509 (self.check_ip, self.op.instance_name))
3511 # bridge verification
3512 bridge = getattr(self.op, "bridge", None)
3514 self.op.bridge = self.cfg.GetDefBridge()
3516 self.op.bridge = bridge
3520 if self.op.iallocator is not None:
3521 self._RunAllocator()
3523 #### node related checks
3525 # check primary node
3526 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3527 assert self.pnode is not None, \
3528 "Cannot retrieve locked node %s" % self.op.pnode
3529 self.secondaries = []
3531 # mirror node verification
3532 if self.op.disk_template in constants.DTS_NET_MIRROR:
3533 if self.op.snode is None:
3534 raise errors.OpPrereqError("The networked disk templates need"
3536 if self.op.snode == pnode.name:
3537 raise errors.OpPrereqError("The secondary node cannot be"
3538 " the primary node.")
3539 self.secondaries.append(self.op.snode)
3541 nodenames = [pnode.name] + self.secondaries
3543 req_size = _ComputeDiskSize(self.op.disk_template,
3544 self.op.disk_size, self.op.swap_size)
3546 # Check lv size requirements
3547 if req_size is not None:
3548 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3550 for node in nodenames:
3551 info = nodeinfo.get(node, None)
3553 raise errors.OpPrereqError("Cannot get current information"
3554 " from node '%s'" % node)
3555 vg_free = info.get('vg_free', None)
3556 if not isinstance(vg_free, int):
3557 raise errors.OpPrereqError("Can't compute free disk space on"
3559 if req_size > info['vg_free']:
3560 raise errors.OpPrereqError("Not enough disk space on target node %s."
3561 " %d MB available, %d MB required" %
3562 (node, info['vg_free'], req_size))
3564 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3567 os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3569 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3570 " primary node" % self.op.os_type)
3572 # bridge check on primary node
3573 if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3574 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3575 " destination node '%s'" %
3576 (self.op.bridge, pnode.name))
3578 # memory check on primary node
3580 _CheckNodeFreeMemory(self, self.pnode.name,
3581 "creating instance %s" % self.op.instance_name,
3582 self.be_full[constants.BE_MEMORY],
3586 self.instance_status = 'up'
3588 self.instance_status = 'down'
3590 def Exec(self, feedback_fn):
3591 """Create and add the instance to the cluster.
3594 instance = self.op.instance_name
3595 pnode_name = self.pnode.name
3597 if self.op.mac == "auto":
3598 mac_address = self.cfg.GenerateMAC()
3600 mac_address = self.op.mac
3602 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3603 if self.inst_ip is not None:
3604 nic.ip = self.inst_ip
3606 ht_kind = self.op.hypervisor
3607 if ht_kind in constants.HTS_REQ_PORT:
3608 network_port = self.cfg.AllocatePort()
3612 ##if self.op.vnc_bind_address is None:
3613 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3615 # this is needed because os.path.join does not accept None arguments
3616 if self.op.file_storage_dir is None:
3617 string_file_storage_dir = ""
3619 string_file_storage_dir = self.op.file_storage_dir
3621 # build the full file storage dir path
3622 file_storage_dir = os.path.normpath(os.path.join(
3623 self.cfg.GetFileStorageDir(),
3624 string_file_storage_dir, instance))
3627 disks = _GenerateDiskTemplate(self,
3628 self.op.disk_template,
3629 instance, pnode_name,
3630 self.secondaries, self.op.disk_size,
3633 self.op.file_driver)
3635 iobj = objects.Instance(name=instance, os=self.op.os_type,
3636 primary_node=pnode_name,
3637 nics=[nic], disks=disks,
3638 disk_template=self.op.disk_template,
3639 status=self.instance_status,
3640 network_port=network_port,
3641 beparams=self.op.beparams,
3642 hvparams=self.op.hvparams,
3643 hypervisor=self.op.hypervisor,
3646 feedback_fn("* creating instance disks...")
3647 if not _CreateDisks(self, iobj):
3648 _RemoveDisks(self, iobj)
3649 self.cfg.ReleaseDRBDMinors(instance)
3650 raise errors.OpExecError("Device creation failed, reverting...")
3652 feedback_fn("adding instance %s to cluster config" % instance)
3654 self.cfg.AddInstance(iobj)
3655 # Declare that we don't want to remove the instance lock anymore, as we've
3656 # added the instance to the config
3657 del self.remove_locks[locking.LEVEL_INSTANCE]
3658 # Remove the temp. assignements for the instance's drbds
3659 self.cfg.ReleaseDRBDMinors(instance)
3661 if self.op.wait_for_sync:
3662 disk_abort = not _WaitForSync(self, iobj)
3663 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3664 # make sure the disks are not degraded (still sync-ing is ok)
3666 feedback_fn("* checking mirrors status")
3667 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3672 _RemoveDisks(self, iobj)
3673 self.cfg.RemoveInstance(iobj.name)
3674 # Make sure the instance lock gets removed
3675 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3676 raise errors.OpExecError("There are some degraded disks for"
3679 feedback_fn("creating os for instance %s on node %s" %
3680 (instance, pnode_name))
3682 if iobj.disk_template != constants.DT_DISKLESS:
3683 if self.op.mode == constants.INSTANCE_CREATE:
3684 feedback_fn("* running the instance OS create scripts...")
3685 if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3686 raise errors.OpExecError("could not add os for instance %s"
3688 (instance, pnode_name))
3690 elif self.op.mode == constants.INSTANCE_IMPORT:
3691 feedback_fn("* running the instance OS import scripts...")
3692 src_node = self.op.src_node
3693 src_image = self.src_image
3694 cluster_name = self.cfg.GetClusterName()
3695 if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3696 src_node, src_image,
3698 raise errors.OpExecError("Could not import os for instance"
3700 (instance, pnode_name))
3702 # also checked in the prereq part
3703 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3707 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3708 feedback_fn("* starting instance...")
3709 if not self.rpc.call_instance_start(pnode_name, iobj, None):
3710 raise errors.OpExecError("Could not start instance")
3713 class LUConnectConsole(NoHooksLU):
3714 """Connect to an instance's console.
3716 This is somewhat special in that it returns the command line that
3717 you need to run on the master node in order to connect to the
3721 _OP_REQP = ["instance_name"]
3724 def ExpandNames(self):
3725 self._ExpandAndLockInstance()
3727 def CheckPrereq(self):
3728 """Check prerequisites.
3730 This checks that the instance is in the cluster.
3733 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3734 assert self.instance is not None, \
3735 "Cannot retrieve locked instance %s" % self.op.instance_name
3737 def Exec(self, feedback_fn):
3738 """Connect to the console of an instance
3741 instance = self.instance
3742 node = instance.primary_node
3744 node_insts = self.rpc.call_instance_list([node],
3745 [instance.hypervisor])[node]
3746 if node_insts is False:
3747 raise errors.OpExecError("Can't connect to node %s." % node)
3749 if instance.name not in node_insts:
3750 raise errors.OpExecError("Instance %s is not running." % instance.name)
3752 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3754 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3755 console_cmd = hyper.GetShellCommandForConsole(instance)
3758 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3761 class LUReplaceDisks(LogicalUnit):
3762 """Replace the disks of an instance.
3765 HPATH = "mirrors-replace"
3766 HTYPE = constants.HTYPE_INSTANCE
3767 _OP_REQP = ["instance_name", "mode", "disks"]
3770 def ExpandNames(self):
3771 self._ExpandAndLockInstance()
3773 if not hasattr(self.op, "remote_node"):
3774 self.op.remote_node = None
3776 ia_name = getattr(self.op, "iallocator", None)
3777 if ia_name is not None:
3778 if self.op.remote_node is not None:
3779 raise errors.OpPrereqError("Give either the iallocator or the new"
3780 " secondary, not both")
3781 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3782 elif self.op.remote_node is not None:
3783 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3784 if remote_node is None:
3785 raise errors.OpPrereqError("Node '%s' not known" %
3786 self.op.remote_node)
3787 self.op.remote_node = remote_node
3788 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3789 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3791 self.needed_locks[locking.LEVEL_NODE] = []
3792 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3794 def DeclareLocks(self, level):
3795 # If we're not already locking all nodes in the set we have to declare the
3796 # instance's primary/secondary nodes.
3797 if (level == locking.LEVEL_NODE and
3798 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3799 self._LockInstancesNodes()
3801 def _RunAllocator(self):
3802 """Compute a new secondary node using an IAllocator.
3805 ial = IAllocator(self,
3806 mode=constants.IALLOCATOR_MODE_RELOC,
3807 name=self.op.instance_name,
3808 relocate_from=[self.sec_node])
3810 ial.Run(self.op.iallocator)
3813 raise errors.OpPrereqError("Can't compute nodes using"
3814 " iallocator '%s': %s" % (self.op.iallocator,
3816 if len(ial.nodes) != ial.required_nodes:
3817 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3818 " of nodes (%s), required %s" %
3819 (len(ial.nodes), ial.required_nodes))
3820 self.op.remote_node = ial.nodes[0]
3821 logger.ToStdout("Selected new secondary for the instance: %s" %
3822 self.op.remote_node)
3824 def BuildHooksEnv(self):
3827 This runs on the master, the primary and all the secondaries.
3831 "MODE": self.op.mode,
3832 "NEW_SECONDARY": self.op.remote_node,
3833 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3835 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3837 self.cfg.GetMasterNode(),
3838 self.instance.primary_node,
3840 if self.op.remote_node is not None:
3841 nl.append(self.op.remote_node)
3844 def CheckPrereq(self):
3845 """Check prerequisites.
3847 This checks that the instance is in the cluster.
3850 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3851 assert instance is not None, \
3852 "Cannot retrieve locked instance %s" % self.op.instance_name
3853 self.instance = instance
3855 if instance.disk_template not in constants.DTS_NET_MIRROR:
3856 raise errors.OpPrereqError("Instance's disk layout is not"
3857 " network mirrored.")
3859 if len(instance.secondary_nodes) != 1:
3860 raise errors.OpPrereqError("The instance has a strange layout,"
3861 " expected one secondary but found %d" %
3862 len(instance.secondary_nodes))
3864 self.sec_node = instance.secondary_nodes[0]
3866 ia_name = getattr(self.op, "iallocator", None)
3867 if ia_name is not None:
3868 self._RunAllocator()
3870 remote_node = self.op.remote_node
3871 if remote_node is not None:
3872 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3873 assert self.remote_node_info is not None, \
3874 "Cannot retrieve locked node %s" % remote_node
3876 self.remote_node_info = None
3877 if remote_node == instance.primary_node:
3878 raise errors.OpPrereqError("The specified node is the primary node of"
3880 elif remote_node == self.sec_node:
3881 if self.op.mode == constants.REPLACE_DISK_SEC:
3882 # this is for DRBD8, where we can't execute the same mode of
3883 # replacement as for drbd7 (no different port allocated)
3884 raise errors.OpPrereqError("Same secondary given, cannot execute"
3886 if instance.disk_template == constants.DT_DRBD8:
3887 if (self.op.mode == constants.REPLACE_DISK_ALL and
3888 remote_node is not None):
3889 # switch to replace secondary mode
3890 self.op.mode = constants.REPLACE_DISK_SEC
3892 if self.op.mode == constants.REPLACE_DISK_ALL:
3893 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3894 " secondary disk replacement, not"
3896 elif self.op.mode == constants.REPLACE_DISK_PRI:
3897 if remote_node is not None:
3898 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3899 " the secondary while doing a primary"
3900 " node disk replacement")
3901 self.tgt_node = instance.primary_node
3902 self.oth_node = instance.secondary_nodes[0]
3903 elif self.op.mode == constants.REPLACE_DISK_SEC:
3904 self.new_node = remote_node # this can be None, in which case
3905 # we don't change the secondary
3906 self.tgt_node = instance.secondary_nodes[0]
3907 self.oth_node = instance.primary_node
3909 raise errors.ProgrammerError("Unhandled disk replace mode")
3911 for name in self.op.disks:
3912 if instance.FindDisk(name) is None:
3913 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3914 (name, instance.name))
3916 def _ExecD8DiskOnly(self, feedback_fn):
3917 """Replace a disk on the primary or secondary for dbrd8.
3919 The algorithm for replace is quite complicated:
3920 - for each disk to be replaced:
3921 - create new LVs on the target node with unique names
3922 - detach old LVs from the drbd device
3923 - rename old LVs to name_replaced.<time_t>
3924 - rename new LVs to old LVs
3925 - attach the new LVs (with the old names now) to the drbd device
3926 - wait for sync across all devices
3927 - for each modified disk:
3928 - remove old LVs (which have the name name_replaces.<time_t>)
3930 Failures are not very well handled.
3934 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3935 instance = self.instance
3937 vgname = self.cfg.GetVGName()
3940 tgt_node = self.tgt_node
3941 oth_node = self.oth_node
3943 # Step: check device activation
3944 self.proc.LogStep(1, steps_total, "check device existence")
3945 info("checking volume groups")
3946 my_vg = cfg.GetVGName()
3947 results = self.rpc.call_vg_list([oth_node, tgt_node])
3949 raise errors.OpExecError("Can't list volume groups on the nodes")
3950 for node in oth_node, tgt_node:
3951 res = results.get(node, False)
3952 if not res or my_vg not in res:
3953 raise errors.OpExecError("Volume group '%s' not found on %s" %
3955 for dev in instance.disks:
3956 if not dev.iv_name in self.op.disks:
3958 for node in tgt_node, oth_node:
3959 info("checking %s on %s" % (dev.iv_name, node))
3960 cfg.SetDiskID(dev, node)
3961 if not self.rpc.call_blockdev_find(node, dev):
3962 raise errors.OpExecError("Can't find device %s on node %s" %
3963 (dev.iv_name, node))
3965 # Step: check other node consistency
3966 self.proc.LogStep(2, steps_total, "check peer consistency")
3967 for dev in instance.disks:
3968 if not dev.iv_name in self.op.disks:
3970 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3971 if not _CheckDiskConsistency(self, dev, oth_node,
3972 oth_node==instance.primary_node):
3973 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3974 " to replace disks on this node (%s)" %
3975 (oth_node, tgt_node))
3977 # Step: create new storage
3978 self.proc.LogStep(3, steps_total, "allocate new storage")
3979 for dev in instance.disks:
3980 if not dev.iv_name in self.op.disks:
3983 cfg.SetDiskID(dev, tgt_node)
3984 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3985 names = _GenerateUniqueNames(self, lv_names)
3986 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3987 logical_id=(vgname, names[0]))
3988 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3989 logical_id=(vgname, names[1]))
3990 new_lvs = [lv_data, lv_meta]
3991 old_lvs = dev.children
3992 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3993 info("creating new local storage on %s for %s" %
3994 (tgt_node, dev.iv_name))
3995 # since we *always* want to create this LV, we use the
3996 # _Create...OnPrimary (which forces the creation), even if we
3997 # are talking about the secondary node
3998 for new_lv in new_lvs:
3999 if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4000 _GetInstanceInfoText(instance)):
4001 raise errors.OpExecError("Failed to create new LV named '%s' on"
4003 (new_lv.logical_id[1], tgt_node))
4005 # Step: for each lv, detach+rename*2+attach
4006 self.proc.LogStep(4, steps_total, "change drbd configuration")
4007 for dev, old_lvs, new_lvs in iv_names.itervalues():
4008 info("detaching %s drbd from local storage" % dev.iv_name)
4009 if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4010 raise errors.OpExecError("Can't detach drbd from local storage on node"
4011 " %s for device %s" % (tgt_node, dev.iv_name))
4013 #cfg.Update(instance)
4015 # ok, we created the new LVs, so now we know we have the needed
4016 # storage; as such, we proceed on the target node to rename
4017 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4018 # using the assumption that logical_id == physical_id (which in
4019 # turn is the unique_id on that node)
4021 # FIXME(iustin): use a better name for the replaced LVs
4022 temp_suffix = int(time.time())
4023 ren_fn = lambda d, suff: (d.physical_id[0],
4024 d.physical_id[1] + "_replaced-%s" % suff)
4025 # build the rename list based on what LVs exist on the node
4027 for to_ren in old_lvs:
4028 find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4029 if find_res is not None: # device exists
4030 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4032 info("renaming the old LVs on the target node")
4033 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4034 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4035 # now we rename the new LVs to the old LVs
4036 info("renaming the new LVs on the target node")
4037 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4038 if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4039 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4041 for old, new in zip(old_lvs, new_lvs):
4042 new.logical_id = old.logical_id
4043 cfg.SetDiskID(new, tgt_node)
4045 for disk in old_lvs:
4046 disk.logical_id = ren_fn(disk, temp_suffix)
4047 cfg.SetDiskID(disk, tgt_node)
4049 # now that the new lvs have the old name, we can add them to the device
4050 info("adding new mirror component on %s" % tgt_node)
4051 if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4052 for new_lv in new_lvs:
4053 if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4054 warning("Can't rollback device %s", hint="manually cleanup unused"
4056 raise errors.OpExecError("Can't add local storage to drbd")
4058 dev.children = new_lvs
4059 cfg.Update(instance)
4061 # Step: wait for sync
4063 # this can fail as the old devices are degraded and _WaitForSync
4064 # does a combined result over all disks, so we don't check its
4066 self.proc.LogStep(5, steps_total, "sync devices")
4067 _WaitForSync(self, instance, unlock=True)
4069 # so check manually all the devices
4070 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4071 cfg.SetDiskID(dev, instance.primary_node)
4072 is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4074 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4076 # Step: remove old storage
4077 self.proc.LogStep(6, steps_total, "removing old storage")
4078 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4079 info("remove logical volumes for %s" % name)
4081 cfg.SetDiskID(lv, tgt_node)
4082 if not self.rpc.call_blockdev_remove(tgt_node, lv):
4083 warning("Can't remove old LV", hint="manually remove unused LVs")
4086 def _ExecD8Secondary(self, feedback_fn):
4087 """Replace the secondary node for drbd8.
4089 The algorithm for replace is quite complicated:
4090 - for all disks of the instance:
4091 - create new LVs on the new node with same names
4092 - shutdown the drbd device on the old secondary
4093 - disconnect the drbd network on the primary
4094 - create the drbd device on the new secondary
4095 - network attach the drbd on the primary, using an artifice:
4096 the drbd code for Attach() will connect to the network if it
4097 finds a device which is connected to the good local disks but
4099 - wait for sync across all devices
4100 - remove all disks from the old secondary
4102 Failures are not very well handled.
4106 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4107 instance = self.instance
4109 vgname = self.cfg.GetVGName()
4112 old_node = self.tgt_node
4113 new_node = self.new_node
4114 pri_node = instance.primary_node
4116 # Step: check device activation
4117 self.proc.LogStep(1, steps_total, "check device existence")
4118 info("checking volume groups")
4119 my_vg = cfg.GetVGName()
4120 results = self.rpc.call_vg_list([pri_node, new_node])
4122 raise errors.OpExecError("Can't list volume groups on the nodes")
4123 for node in pri_node, new_node:
4124 res = results.get(node, False)
4125 if not res or my_vg not in res:
4126 raise errors.OpExecError("Volume group '%s' not found on %s" %
4128 for dev in instance.disks:
4129 if not dev.iv_name in self.op.disks:
4131 info("checking %s on %s" % (dev.iv_name, pri_node))
4132 cfg.SetDiskID(dev, pri_node)
4133 if not self.rpc.call_blockdev_find(pri_node, dev):
4134 raise errors.OpExecError("Can't find device %s on node %s" %
4135 (dev.iv_name, pri_node))
4137 # Step: check other node consistency
4138 self.proc.LogStep(2, steps_total, "check peer consistency")
4139 for dev in instance.disks:
4140 if not dev.iv_name in self.op.disks:
4142 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4143 if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4144 raise errors.OpExecError("Primary node (%s) has degraded storage,"
4145 " unsafe to replace the secondary" %
4148 # Step: create new storage
4149 self.proc.LogStep(3, steps_total, "allocate new storage")
4150 for dev in instance.disks:
4152 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4153 # since we *always* want to create this LV, we use the
4154 # _Create...OnPrimary (which forces the creation), even if we
4155 # are talking about the secondary node
4156 for new_lv in dev.children:
4157 if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4158 _GetInstanceInfoText(instance)):
4159 raise errors.OpExecError("Failed to create new LV named '%s' on"
4161 (new_lv.logical_id[1], new_node))
4164 # Step 4: dbrd minors and drbd setups changes
4165 # after this, we must manually remove the drbd minors on both the
4166 # error and the success paths
4167 minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4169 logging.debug("Allocated minors %s" % (minors,))
4170 self.proc.LogStep(4, steps_total, "changing drbd configuration")
4171 for dev, new_minor in zip(instance.disks, minors):
4173 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4174 # create new devices on new_node
4175 if pri_node == dev.logical_id[0]:
4176 new_logical_id = (pri_node, new_node,
4177 dev.logical_id[2], dev.logical_id[3], new_minor,
4180 new_logical_id = (new_node, pri_node,
4181 dev.logical_id[2], new_minor, dev.logical_id[4],
4183 iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4184 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4186 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4187 logical_id=new_logical_id,
4188 children=dev.children)
4189 if not _CreateBlockDevOnSecondary(self, new_node, instance,
4191 _GetInstanceInfoText(instance)):
4192 self.cfg.ReleaseDRBDMinors(instance.name)
4193 raise errors.OpExecError("Failed to create new DRBD on"
4194 " node '%s'" % new_node)
4196 for dev in instance.disks:
4197 # we have new devices, shutdown the drbd on the old secondary
4198 info("shutting down drbd for %s on old node" % dev.iv_name)
4199 cfg.SetDiskID(dev, old_node)
4200 if not self.rpc.call_blockdev_shutdown(old_node, dev):
4201 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4202 hint="Please cleanup this device manually as soon as possible")
4204 info("detaching primary drbds from the network (=> standalone)")
4206 for dev in instance.disks:
4207 cfg.SetDiskID(dev, pri_node)
4208 # set the network part of the physical (unique in bdev terms) id
4209 # to None, meaning detach from network
4210 dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4211 # and 'find' the device, which will 'fix' it to match the
4213 if self.rpc.call_blockdev_find(pri_node, dev):
4216 warning("Failed to detach drbd %s from network, unusual case" %
4220 # no detaches succeeded (very unlikely)
4221 self.cfg.ReleaseDRBDMinors(instance.name)
4222 raise errors.OpExecError("Can't detach at least one DRBD from old node")
4224 # if we managed to detach at least one, we update all the disks of
4225 # the instance to point to the new secondary
4226 info("updating instance configuration")
4227 for dev, _, new_logical_id in iv_names.itervalues():
4228 dev.logical_id = new_logical_id
4229 cfg.SetDiskID(dev, pri_node)
4230 cfg.Update(instance)
4231 # we can remove now the temp minors as now the new values are
4232 # written to the config file (and therefore stable)
4233 self.cfg.ReleaseDRBDMinors(instance.name)
4235 # and now perform the drbd attach
4236 info("attaching primary drbds to new secondary (standalone => connected)")
4238 for dev in instance.disks:
4239 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4240 # since the attach is smart, it's enough to 'find' the device,
4241 # it will automatically activate the network, if the physical_id
4243 cfg.SetDiskID(dev, pri_node)
4244 logging.debug("Disk to attach: %s", dev)
4245 if not self.rpc.call_blockdev_find(pri_node, dev):
4246 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4247 "please do a gnt-instance info to see the status of disks")
4249 # this can fail as the old devices are degraded and _WaitForSync
4250 # does a combined result over all disks, so we don't check its
4252 self.proc.LogStep(5, steps_total, "sync devices")
4253 _WaitForSync(self, instance, unlock=True)
4255 # so check manually all the devices
4256 for name, (dev, old_lvs, _) in iv_names.iteritems():
4257 cfg.SetDiskID(dev, pri_node)
4258 is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4260 raise errors.OpExecError("DRBD device %s is degraded!" % name)
4262 self.proc.LogStep(6, steps_total, "removing old storage")
4263 for name, (dev, old_lvs, _) in iv_names.iteritems():
4264 info("remove logical volumes for %s" % name)
4266 cfg.SetDiskID(lv, old_node)
4267 if not self.rpc.call_blockdev_remove(old_node, lv):
4268 warning("Can't remove LV on old secondary",
4269 hint="Cleanup stale volumes by hand")
4271 def Exec(self, feedback_fn):
4272 """Execute disk replacement.
4274 This dispatches the disk replacement to the appropriate handler.
4277 instance = self.instance
4279 # Activate the instance disks if we're replacing them on a down instance
4280 if instance.status == "down":
4281 _StartInstanceDisks(self, instance, True)
4283 if instance.disk_template == constants.DT_DRBD8:
4284 if self.op.remote_node is None:
4285 fn = self._ExecD8DiskOnly
4287 fn = self._ExecD8Secondary
4289 raise errors.ProgrammerError("Unhandled disk replacement case")
4291 ret = fn(feedback_fn)
4293 # Deactivate the instance disks if we're replacing them on a down instance
4294 if instance.status == "down":
4295 _SafeShutdownInstanceDisks(self, instance)
4300 class LUGrowDisk(LogicalUnit):
4301 """Grow a disk of an instance.
4305 HTYPE = constants.HTYPE_INSTANCE
4306 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4309 def ExpandNames(self):
4310 self._ExpandAndLockInstance()
4311 self.needed_locks[locking.LEVEL_NODE] = []
4312 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4314 def DeclareLocks(self, level):
4315 if level == locking.LEVEL_NODE:
4316 self._LockInstancesNodes()
4318 def BuildHooksEnv(self):
4321 This runs on the master, the primary and all the secondaries.
4325 "DISK": self.op.disk,
4326 "AMOUNT": self.op.amount,
4328 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4330 self.cfg.GetMasterNode(),
4331 self.instance.primary_node,
4335 def CheckPrereq(self):
4336 """Check prerequisites.
4338 This checks that the instance is in the cluster.
4341 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4342 assert instance is not None, \
4343 "Cannot retrieve locked instance %s" % self.op.instance_name
4345 self.instance = instance
4347 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4348 raise errors.OpPrereqError("Instance's disk layout does not support"
4351 if instance.FindDisk(self.op.disk) is None:
4352 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4353 (self.op.disk, instance.name))
4355 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4356 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4357 instance.hypervisor)
4358 for node in nodenames:
4359 info = nodeinfo.get(node, None)
4361 raise errors.OpPrereqError("Cannot get current information"
4362 " from node '%s'" % node)
4363 vg_free = info.get('vg_free', None)
4364 if not isinstance(vg_free, int):
4365 raise errors.OpPrereqError("Can't compute free disk space on"
4367 if self.op.amount > info['vg_free']:
4368 raise errors.OpPrereqError("Not enough disk space on target node %s:"
4369 " %d MiB available, %d MiB required" %
4370 (node, info['vg_free'], self.op.amount))
4372 def Exec(self, feedback_fn):
4373 """Execute disk grow.
4376 instance = self.instance
4377 disk = instance.FindDisk(self.op.disk)
4378 for node in (instance.secondary_nodes + (instance.primary_node,)):
4379 self.cfg.SetDiskID(disk, node)
4380 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4381 if (not result or not isinstance(result, (list, tuple)) or
4383 raise errors.OpExecError("grow request failed to node %s" % node)
4385 raise errors.OpExecError("grow request failed to node %s: %s" %
4387 disk.RecordGrow(self.op.amount)
4388 self.cfg.Update(instance)
4389 if self.op.wait_for_sync:
4390 disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4392 logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4393 " Please check the instance.")
4396 class LUQueryInstanceData(NoHooksLU):
4397 """Query runtime instance data.
4400 _OP_REQP = ["instances", "static"]
4403 def ExpandNames(self):
4404 self.needed_locks = {}
4405 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4407 if not isinstance(self.op.instances, list):
4408 raise errors.OpPrereqError("Invalid argument type 'instances'")
4410 if self.op.instances:
4411 self.wanted_names = []
4412 for name in self.op.instances:
4413 full_name = self.cfg.ExpandInstanceName(name)
4414 if full_name is None:
4415 raise errors.OpPrereqError("Instance '%s' not known" %
4416 self.op.instance_name)
4417 self.wanted_names.append(full_name)
4418 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4420 self.wanted_names = None
4421 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4423 self.needed_locks[locking.LEVEL_NODE] = []
4424 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4426 def DeclareLocks(self, level):
4427 if level == locking.LEVEL_NODE:
4428 self._LockInstancesNodes()
4430 def CheckPrereq(self):
4431 """Check prerequisites.
4433 This only checks the optional instance list against the existing names.
4436 if self.wanted_names is None:
4437 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4439 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4440 in self.wanted_names]
4443 def _ComputeDiskStatus(self, instance, snode, dev):
4444 """Compute block device status.
4447 static = self.op.static
4449 self.cfg.SetDiskID(dev, instance.primary_node)
4450 dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4454 if dev.dev_type in constants.LDS_DRBD:
4455 # we change the snode then (otherwise we use the one passed in)
4456 if dev.logical_id[0] == instance.primary_node:
4457 snode = dev.logical_id[1]
4459 snode = dev.logical_id[0]
4461 if snode and not static:
4462 self.cfg.SetDiskID(dev, snode)
4463 dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4468 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4469 for child in dev.children]
4474 "iv_name": dev.iv_name,
4475 "dev_type": dev.dev_type,
4476 "logical_id": dev.logical_id,
4477 "physical_id": dev.physical_id,
4478 "pstatus": dev_pstatus,
4479 "sstatus": dev_sstatus,
4480 "children": dev_children,
4485 def Exec(self, feedback_fn):
4486 """Gather and return data"""
4489 cluster = self.cfg.GetClusterInfo()
4491 for instance in self.wanted_instances:
4492 if not self.op.static:
4493 remote_info = self.rpc.call_instance_info(instance.primary_node,
4495 instance.hypervisor)
4496 if remote_info and "state" in remote_info:
4499 remote_state = "down"
4502 if instance.status == "down":
4503 config_state = "down"
4507 disks = [self._ComputeDiskStatus(instance, None, device)
4508 for device in instance.disks]
4511 "name": instance.name,
4512 "config_state": config_state,
4513 "run_state": remote_state,
4514 "pnode": instance.primary_node,
4515 "snodes": instance.secondary_nodes,
4517 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4519 "hypervisor": instance.hypervisor,
4520 "network_port": instance.network_port,
4521 "hv_instance": instance.hvparams,
4522 "hv_actual": cluster.FillHV(instance),
4523 "be_instance": instance.beparams,
4524 "be_actual": cluster.FillBE(instance),
4527 result[instance.name] = idict
4532 class LUSetInstanceParams(LogicalUnit):
4533 """Modifies an instances's parameters.
4536 HPATH = "instance-modify"
4537 HTYPE = constants.HTYPE_INSTANCE
4538 _OP_REQP = ["instance_name", "hvparams"]
4541 def ExpandNames(self):
4542 self._ExpandAndLockInstance()
4543 self.needed_locks[locking.LEVEL_NODE] = []
4544 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4547 def DeclareLocks(self, level):
4548 if level == locking.LEVEL_NODE:
4549 self._LockInstancesNodes()
4551 def BuildHooksEnv(self):
4554 This runs on the master, primary and secondaries.
4558 if constants.BE_MEMORY in self.be_new:
4559 args['memory'] = self.be_new[constants.BE_MEMORY]
4560 if constants.BE_VCPUS in self.be_new:
4561 args['vcpus'] = self.be_bnew[constants.BE_VCPUS]
4562 if self.do_ip or self.do_bridge or self.mac:
4566 ip = self.instance.nics[0].ip
4568 bridge = self.bridge
4570 bridge = self.instance.nics[0].bridge
4574 mac = self.instance.nics[0].mac
4575 args['nics'] = [(ip, bridge, mac)]
4576 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4577 nl = [self.cfg.GetMasterNode(),
4578 self.instance.primary_node] + list(self.instance.secondary_nodes)
4581 def CheckPrereq(self):
4582 """Check prerequisites.
4584 This only checks the instance list against the existing names.
4587 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4588 # a separate CheckArguments function, if we implement one, so the operation
4589 # can be aborted without waiting for any lock, should it have an error...
4590 self.ip = getattr(self.op, "ip", None)
4591 self.mac = getattr(self.op, "mac", None)
4592 self.bridge = getattr(self.op, "bridge", None)
4593 self.kernel_path = getattr(self.op, "kernel_path", None)
4594 self.initrd_path = getattr(self.op, "initrd_path", None)
4595 self.force = getattr(self.op, "force", None)
4596 all_parms = [self.ip, self.bridge, self.mac]
4597 if (all_parms.count(None) == len(all_parms) and
4598 not self.op.hvparams and
4599 not self.op.beparams):
4600 raise errors.OpPrereqError("No changes submitted")
4601 for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4602 val = self.op.beparams.get(item, None)
4606 except ValueError, err:
4607 raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4608 self.op.beparams[item] = val
4609 if self.ip is not None:
4611 if self.ip.lower() == "none":
4614 if not utils.IsValidIP(self.ip):
4615 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4618 self.do_bridge = (self.bridge is not None)
4619 if self.mac is not None:
4620 if self.cfg.IsMacInUse(self.mac):
4621 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4623 if not utils.IsValidMac(self.mac):
4624 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4626 # checking the new params on the primary/secondary nodes
4628 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4629 assert self.instance is not None, \
4630 "Cannot retrieve locked instance %s" % self.op.instance_name
4631 pnode = self.instance.primary_node
4633 nodelist.extend(instance.secondary_nodes)
4635 # hvparams processing
4636 if self.op.hvparams:
4637 i_hvdict = copy.deepcopy(instance.hvparams)
4638 for key, val in self.op.hvparams.iteritems():
4646 cluster = self.cfg.GetClusterInfo()
4647 hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4650 hypervisor.GetHypervisor(
4651 instance.hypervisor).CheckParameterSyntax(hv_new)
4652 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4653 self.hv_new = hv_new # the new actual values
4654 self.hv_inst = i_hvdict # the new dict (without defaults)
4656 self.hv_new = self.hv_inst = {}
4658 # beparams processing
4659 if self.op.beparams:
4660 i_bedict = copy.deepcopy(instance.beparams)
4661 for key, val in self.op.beparams.iteritems():
4669 cluster = self.cfg.GetClusterInfo()
4670 be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4672 self.be_new = be_new # the new actual values
4673 self.be_inst = i_bedict # the new dict (without defaults)
4675 self.hv_new = self.hv_inst = {}
4679 if constants.BE_MEMORY in self.op.beparams and not self.force:
4680 mem_check_list = [pnode]
4681 if be_new[constants.BE_AUTO_BALANCE]:
4682 # either we changed auto_balance to yes or it was from before
4683 mem_check_list.extend(instance.secondary_nodes)
4684 instance_info = self.rpc.call_instance_info(pnode, instance.name,
4685 instance.hypervisor)
4686 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4687 instance.hypervisor)
4689 if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4690 # Assume the primary node is unreachable and go ahead
4691 self.warn.append("Can't get info from primary node %s" % pnode)
4694 current_mem = instance_info['memory']
4696 # Assume instance not running
4697 # (there is a slight race condition here, but it's not very probable,
4698 # and we have no other way to check)
4700 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4701 nodeinfo[pnode]['memory_free'])
4703 raise errors.OpPrereqError("This change will prevent the instance"
4704 " from starting, due to %d MB of memory"
4705 " missing on its primary node" % miss_mem)
4707 if be_new[constants.BE_AUTO_BALANCE]:
4708 for node in instance.secondary_nodes:
4709 if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4710 self.warn.append("Can't get info from secondary node %s" % node)
4711 elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4712 self.warn.append("Not enough memory to failover instance to"
4713 " secondary node %s" % node)
4717 def Exec(self, feedback_fn):
4718 """Modifies an instance.
4720 All parameters take effect only at the next restart of the instance.
4722 # Process here the warnings from CheckPrereq, as we don't have a
4723 # feedback_fn there.
4724 for warn in self.warn:
4725 feedback_fn("WARNING: %s" % warn)
4728 instance = self.instance
4730 instance.nics[0].ip = self.ip
4731 result.append(("ip", self.ip))
4733 instance.nics[0].bridge = self.bridge
4734 result.append(("bridge", self.bridge))
4736 instance.nics[0].mac = self.mac
4737 result.append(("mac", self.mac))
4738 if self.op.hvparams:
4739 instance.hvparams = self.hv_new
4740 for key, val in self.op.hvparams.iteritems():
4741 result.append(("hv/%s" % key, val))
4742 if self.op.beparams:
4743 instance.beparams = self.be_inst
4744 for key, val in self.op.beparams.iteritems():
4745 result.append(("be/%s" % key, val))
4747 self.cfg.Update(instance)
4752 class LUQueryExports(NoHooksLU):
4753 """Query the exports list
4756 _OP_REQP = ['nodes']
4759 def ExpandNames(self):
4760 self.needed_locks = {}
4761 self.share_locks[locking.LEVEL_NODE] = 1
4762 if not self.op.nodes:
4763 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4765 self.needed_locks[locking.LEVEL_NODE] = \
4766 _GetWantedNodes(self, self.op.nodes)
4768 def CheckPrereq(self):
4769 """Check prerequisites.
4772 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4774 def Exec(self, feedback_fn):
4775 """Compute the list of all the exported system images.
4778 a dictionary with the structure node->(export-list)
4779 where export-list is a list of the instances exported on
4783 return self.rpc.call_export_list(self.nodes)
4786 class LUExportInstance(LogicalUnit):
4787 """Export an instance to an image in the cluster.
4790 HPATH = "instance-export"
4791 HTYPE = constants.HTYPE_INSTANCE
4792 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4795 def ExpandNames(self):
4796 self._ExpandAndLockInstance()
4797 # FIXME: lock only instance primary and destination node
4799 # Sad but true, for now we have do lock all nodes, as we don't know where
4800 # the previous export might be, and and in this LU we search for it and
4801 # remove it from its current node. In the future we could fix this by:
4802 # - making a tasklet to search (share-lock all), then create the new one,
4803 # then one to remove, after
4804 # - removing the removal operation altoghether
4805 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4807 def DeclareLocks(self, level):
4808 """Last minute lock declaration."""
4809 # All nodes are locked anyway, so nothing to do here.
4811 def BuildHooksEnv(self):
4814 This will run on the master, primary node and target node.
4818 "EXPORT_NODE": self.op.target_node,
4819 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4821 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4822 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4823 self.op.target_node]
4826 def CheckPrereq(self):
4827 """Check prerequisites.
4829 This checks that the instance and node names are valid.
4832 instance_name = self.op.instance_name
4833 self.instance = self.cfg.GetInstanceInfo(instance_name)
4834 assert self.instance is not None, \
4835 "Cannot retrieve locked instance %s" % self.op.instance_name
4837 self.dst_node = self.cfg.GetNodeInfo(
4838 self.cfg.ExpandNodeName(self.op.target_node))
4840 assert self.dst_node is not None, \
4841 "Cannot retrieve locked node %s" % self.op.target_node
4843 # instance disk type verification
4844 for disk in self.instance.disks:
4845 if disk.dev_type == constants.LD_FILE:
4846 raise errors.OpPrereqError("Export not supported for instances with"
4847 " file-based disks")
4849 def Exec(self, feedback_fn):
4850 """Export an instance to an image in the cluster.
4853 instance = self.instance
4854 dst_node = self.dst_node
4855 src_node = instance.primary_node
4856 if self.op.shutdown:
4857 # shutdown the instance, but not the disks
4858 if not self.rpc.call_instance_shutdown(src_node, instance):
4859 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4860 (instance.name, src_node))
4862 vgname = self.cfg.GetVGName()
4867 for disk in instance.disks:
4868 if disk.iv_name == "sda":
4869 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4870 new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4872 if not new_dev_name:
4873 logger.Error("could not snapshot block device %s on node %s" %
4874 (disk.logical_id[1], src_node))
4876 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4877 logical_id=(vgname, new_dev_name),
4878 physical_id=(vgname, new_dev_name),
4879 iv_name=disk.iv_name)
4880 snap_disks.append(new_dev)
4883 if self.op.shutdown and instance.status == "up":
4884 if not self.rpc.call_instance_start(src_node, instance, None):
4885 _ShutdownInstanceDisks(self, instance)
4886 raise errors.OpExecError("Could not start instance")
4888 # TODO: check for size
4890 cluster_name = self.cfg.GetClusterName()
4891 for dev in snap_disks:
4892 if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4893 instance, cluster_name):
4894 logger.Error("could not export block device %s from node %s to node %s"
4895 % (dev.logical_id[1], src_node, dst_node.name))
4896 if not self.rpc.call_blockdev_remove(src_node, dev):
4897 logger.Error("could not remove snapshot block device %s from node %s" %
4898 (dev.logical_id[1], src_node))
4900 if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4901 logger.Error("could not finalize export for instance %s on node %s" %
4902 (instance.name, dst_node.name))
4904 nodelist = self.cfg.GetNodeList()
4905 nodelist.remove(dst_node.name)
4907 # on one-node clusters nodelist will be empty after the removal
4908 # if we proceed the backup would be removed because OpQueryExports
4909 # substitutes an empty list with the full cluster node list.
4911 exportlist = self.rpc.call_export_list(nodelist)
4912 for node in exportlist:
4913 if instance.name in exportlist[node]:
4914 if not self.rpc.call_export_remove(node, instance.name):
4915 logger.Error("could not remove older export for instance %s"
4916 " on node %s" % (instance.name, node))
4919 class LURemoveExport(NoHooksLU):
4920 """Remove exports related to the named instance.
4923 _OP_REQP = ["instance_name"]
4926 def ExpandNames(self):
4927 self.needed_locks = {}
4928 # We need all nodes to be locked in order for RemoveExport to work, but we
4929 # don't need to lock the instance itself, as nothing will happen to it (and
4930 # we can remove exports also for a removed instance)
4931 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4933 def CheckPrereq(self):
4934 """Check prerequisites.
4938 def Exec(self, feedback_fn):
4939 """Remove any export.
4942 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4943 # If the instance was not found we'll try with the name that was passed in.
4944 # This will only work if it was an FQDN, though.
4946 if not instance_name:
4948 instance_name = self.op.instance_name
4950 exportlist = self.rpc.call_export_list(self.acquired_locks[
4951 locking.LEVEL_NODE])
4953 for node in exportlist:
4954 if instance_name in exportlist[node]:
4956 if not self.rpc.call_export_remove(node, instance_name):
4957 logger.Error("could not remove export for instance %s"
4958 " on node %s" % (instance_name, node))
4960 if fqdn_warn and not found:
4961 feedback_fn("Export not found. If trying to remove an export belonging"
4962 " to a deleted instance please use its Fully Qualified"
4966 class TagsLU(NoHooksLU):
4969 This is an abstract class which is the parent of all the other tags LUs.
4973 def ExpandNames(self):
4974 self.needed_locks = {}
4975 if self.op.kind == constants.TAG_NODE:
4976 name = self.cfg.ExpandNodeName(self.op.name)
4978 raise errors.OpPrereqError("Invalid node name (%s)" %
4981 self.needed_locks[locking.LEVEL_NODE] = name
4982 elif self.op.kind == constants.TAG_INSTANCE:
4983 name = self.cfg.ExpandInstanceName(self.op.name)
4985 raise errors.OpPrereqError("Invalid instance name (%s)" %
4988 self.needed_locks[locking.LEVEL_INSTANCE] = name
4990 def CheckPrereq(self):
4991 """Check prerequisites.
4994 if self.op.kind == constants.TAG_CLUSTER:
4995 self.target = self.cfg.GetClusterInfo()
4996 elif self.op.kind == constants.TAG_NODE:
4997 self.target = self.cfg.GetNodeInfo(self.op.name)
4998 elif self.op.kind == constants.TAG_INSTANCE:
4999 self.target = self.cfg.GetInstanceInfo(self.op.name)
5001 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5005 class LUGetTags(TagsLU):
5006 """Returns the tags of a given object.
5009 _OP_REQP = ["kind", "name"]
5012 def Exec(self, feedback_fn):
5013 """Returns the tag list.
5016 return list(self.target.GetTags())
5019 class LUSearchTags(NoHooksLU):
5020 """Searches the tags for a given pattern.
5023 _OP_REQP = ["pattern"]
5026 def ExpandNames(self):
5027 self.needed_locks = {}
5029 def CheckPrereq(self):
5030 """Check prerequisites.
5032 This checks the pattern passed for validity by compiling it.
5036 self.re = re.compile(self.op.pattern)
5037 except re.error, err:
5038 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5039 (self.op.pattern, err))
5041 def Exec(self, feedback_fn):
5042 """Returns the tag list.
5046 tgts = [("/cluster", cfg.GetClusterInfo())]
5047 ilist = cfg.GetAllInstancesInfo().values()
5048 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5049 nlist = cfg.GetAllNodesInfo().values()
5050 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5052 for path, target in tgts:
5053 for tag in target.GetTags():
5054 if self.re.search(tag):
5055 results.append((path, tag))
5059 class LUAddTags(TagsLU):
5060 """Sets a tag on a given object.
5063 _OP_REQP = ["kind", "name", "tags"]
5066 def CheckPrereq(self):
5067 """Check prerequisites.
5069 This checks the type and length of the tag name and value.
5072 TagsLU.CheckPrereq(self)
5073 for tag in self.op.tags:
5074 objects.TaggableObject.ValidateTag(tag)
5076 def Exec(self, feedback_fn):
5081 for tag in self.op.tags:
5082 self.target.AddTag(tag)
5083 except errors.TagError, err:
5084 raise errors.OpExecError("Error while setting tag: %s" % str(err))
5086 self.cfg.Update(self.target)
5087 except errors.ConfigurationError:
5088 raise errors.OpRetryError("There has been a modification to the"
5089 " config file and the operation has been"
5090 " aborted. Please retry.")
5093 class LUDelTags(TagsLU):
5094 """Delete a list of tags from a given object.
5097 _OP_REQP = ["kind", "name", "tags"]
5100 def CheckPrereq(self):
5101 """Check prerequisites.
5103 This checks that we have the given tag.
5106 TagsLU.CheckPrereq(self)
5107 for tag in self.op.tags:
5108 objects.TaggableObject.ValidateTag(tag)
5109 del_tags = frozenset(self.op.tags)
5110 cur_tags = self.target.GetTags()
5111 if not del_tags <= cur_tags:
5112 diff_tags = del_tags - cur_tags
5113 diff_names = ["'%s'" % tag for tag in diff_tags]
5115 raise errors.OpPrereqError("Tag(s) %s not found" %
5116 (",".join(diff_names)))
5118 def Exec(self, feedback_fn):
5119 """Remove the tag from the object.
5122 for tag in self.op.tags:
5123 self.target.RemoveTag(tag)
5125 self.cfg.Update(self.target)
5126 except errors.ConfigurationError:
5127 raise errors.OpRetryError("There has been a modification to the"
5128 " config file and the operation has been"
5129 " aborted. Please retry.")
5132 class LUTestDelay(NoHooksLU):
5133 """Sleep for a specified amount of time.
5135 This LU sleeps on the master and/or nodes for a specified amount of
5139 _OP_REQP = ["duration", "on_master", "on_nodes"]
5142 def ExpandNames(self):
5143 """Expand names and set required locks.
5145 This expands the node list, if any.
5148 self.needed_locks = {}
5149 if self.op.on_nodes:
5150 # _GetWantedNodes can be used here, but is not always appropriate to use
5151 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5153 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5154 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5156 def CheckPrereq(self):
5157 """Check prerequisites.
5161 def Exec(self, feedback_fn):
5162 """Do the actual sleep.
5165 if self.op.on_master:
5166 if not utils.TestDelay(self.op.duration):
5167 raise errors.OpExecError("Error during master delay test")
5168 if self.op.on_nodes:
5169 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5171 raise errors.OpExecError("Complete failure from rpc call")
5172 for node, node_result in result.items():
5174 raise errors.OpExecError("Failure during rpc call to node %s,"
5175 " result: %s" % (node, node_result))
5178 class IAllocator(object):
5179 """IAllocator framework.
5181 An IAllocator instance has three sets of attributes:
5182 - cfg that is needed to query the cluster
5183 - input data (all members of the _KEYS class attribute are required)
5184 - four buffer attributes (in|out_data|text), that represent the
5185 input (to the external script) in text and data structure format,
5186 and the output from it, again in two formats
5187 - the result variables from the script (success, info, nodes) for
5192 "mem_size", "disks", "disk_template",
5193 "os", "tags", "nics", "vcpus",
5199 def __init__(self, lu, mode, name, **kwargs):
5201 # init buffer variables
5202 self.in_text = self.out_text = self.in_data = self.out_data = None
5203 # init all input fields so that pylint is happy
5206 self.mem_size = self.disks = self.disk_template = None
5207 self.os = self.tags = self.nics = self.vcpus = None
5208 self.relocate_from = None
5210 self.required_nodes = None
5211 # init result fields
5212 self.success = self.info = self.nodes = None
5213 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5214 keyset = self._ALLO_KEYS
5215 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5216 keyset = self._RELO_KEYS
5218 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5219 " IAllocator" % self.mode)
5221 if key not in keyset:
5222 raise errors.ProgrammerError("Invalid input parameter '%s' to"
5223 " IAllocator" % key)
5224 setattr(self, key, kwargs[key])
5226 if key not in kwargs:
5227 raise errors.ProgrammerError("Missing input parameter '%s' to"
5228 " IAllocator" % key)
5229 self._BuildInputData()
5231 def _ComputeClusterData(self):
5232 """Compute the generic allocator input data.
5234 This is the data that is independent of the actual operation.
5238 cluster_info = cfg.GetClusterInfo()
5242 "cluster_name": cfg.GetClusterName(),
5243 "cluster_tags": list(cluster_info.GetTags()),
5244 "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5245 # we don't have job IDs
5249 cluster = self.cfg.GetClusterInfo()
5250 for iname in cfg.GetInstanceList():
5251 i_obj = cfg.GetInstanceInfo(iname)
5252 i_list.append((i_obj, cluster.FillBE(i_obj)))
5256 node_list = cfg.GetNodeList()
5257 # FIXME: here we have only one hypervisor information, but
5258 # instance can belong to different hypervisors
5259 node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5260 cfg.GetHypervisorType())
5261 for nname in node_list:
5262 ninfo = cfg.GetNodeInfo(nname)
5263 if nname not in node_data or not isinstance(node_data[nname], dict):
5264 raise errors.OpExecError("Can't get data for node %s" % nname)
5265 remote_info = node_data[nname]
5266 for attr in ['memory_total', 'memory_free', 'memory_dom0',
5267 'vg_size', 'vg_free', 'cpu_total']:
5268 if attr not in remote_info:
5269 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5272 remote_info[attr] = int(remote_info[attr])
5273 except ValueError, err:
5274 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5275 " %s" % (nname, attr, str(err)))
5276 # compute memory used by primary instances
5277 i_p_mem = i_p_up_mem = 0
5278 for iinfo, beinfo in i_list:
5279 if iinfo.primary_node == nname:
5280 i_p_mem += beinfo[constants.BE_MEMORY]
5281 if iinfo.status == "up":
5282 i_p_up_mem += beinfo[constants.BE_MEMORY]
5284 # compute memory used by instances
5286 "tags": list(ninfo.GetTags()),
5287 "total_memory": remote_info['memory_total'],
5288 "reserved_memory": remote_info['memory_dom0'],
5289 "free_memory": remote_info['memory_free'],
5290 "i_pri_memory": i_p_mem,
5291 "i_pri_up_memory": i_p_up_mem,
5292 "total_disk": remote_info['vg_size'],
5293 "free_disk": remote_info['vg_free'],
5294 "primary_ip": ninfo.primary_ip,
5295 "secondary_ip": ninfo.secondary_ip,
5296 "total_cpus": remote_info['cpu_total'],
5298 node_results[nname] = pnr
5299 data["nodes"] = node_results
5303 for iinfo, beinfo in i_list:
5304 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5305 for n in iinfo.nics]
5307 "tags": list(iinfo.GetTags()),
5308 "should_run": iinfo.status == "up",
5309 "vcpus": beinfo[constants.BE_VCPUS],
5310 "memory": beinfo[constants.BE_MEMORY],
5312 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5314 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5315 "disk_template": iinfo.disk_template,
5316 "hypervisor": iinfo.hypervisor,
5318 instance_data[iinfo.name] = pir
5320 data["instances"] = instance_data
5324 def _AddNewInstance(self):
5325 """Add new instance data to allocator structure.
5327 This in combination with _AllocatorGetClusterData will create the
5328 correct structure needed as input for the allocator.
5330 The checks for the completeness of the opcode must have already been
5335 if len(self.disks) != 2:
5336 raise errors.OpExecError("Only two-disk configurations supported")
5338 disk_space = _ComputeDiskSize(self.disk_template,
5339 self.disks[0]["size"], self.disks[1]["size"])
5341 if self.disk_template in constants.DTS_NET_MIRROR:
5342 self.required_nodes = 2
5344 self.required_nodes = 1
5348 "disk_template": self.disk_template,
5351 "vcpus": self.vcpus,
5352 "memory": self.mem_size,
5353 "disks": self.disks,
5354 "disk_space_total": disk_space,
5356 "required_nodes": self.required_nodes,
5358 data["request"] = request
5360 def _AddRelocateInstance(self):
5361 """Add relocate instance data to allocator structure.
5363 This in combination with _IAllocatorGetClusterData will create the
5364 correct structure needed as input for the allocator.
5366 The checks for the completeness of the opcode must have already been
5370 instance = self.lu.cfg.GetInstanceInfo(self.name)
5371 if instance is None:
5372 raise errors.ProgrammerError("Unknown instance '%s' passed to"
5373 " IAllocator" % self.name)
5375 if instance.disk_template not in constants.DTS_NET_MIRROR:
5376 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5378 if len(instance.secondary_nodes) != 1:
5379 raise errors.OpPrereqError("Instance has not exactly one secondary node")
5381 self.required_nodes = 1
5383 disk_space = _ComputeDiskSize(instance.disk_template,
5384 instance.disks[0].size,
5385 instance.disks[1].size)
5390 "disk_space_total": disk_space,
5391 "required_nodes": self.required_nodes,
5392 "relocate_from": self.relocate_from,
5394 self.in_data["request"] = request
5396 def _BuildInputData(self):
5397 """Build input data structures.
5400 self._ComputeClusterData()
5402 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5403 self._AddNewInstance()
5405 self._AddRelocateInstance()
5407 self.in_text = serializer.Dump(self.in_data)
5409 def Run(self, name, validate=True, call_fn=None):
5410 """Run an instance allocator and return the results.
5414 call_fn = self.lu.rpc.call_iallocator_runner
5417 result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5419 if not isinstance(result, (list, tuple)) or len(result) != 4:
5420 raise errors.OpExecError("Invalid result from master iallocator runner")
5422 rcode, stdout, stderr, fail = result
5424 if rcode == constants.IARUN_NOTFOUND:
5425 raise errors.OpExecError("Can't find allocator '%s'" % name)
5426 elif rcode == constants.IARUN_FAILURE:
5427 raise errors.OpExecError("Instance allocator call failed: %s,"
5428 " output: %s" % (fail, stdout+stderr))
5429 self.out_text = stdout
5431 self._ValidateResult()
5433 def _ValidateResult(self):
5434 """Process the allocator results.
5436 This will process and if successful save the result in
5437 self.out_data and the other parameters.
5441 rdict = serializer.Load(self.out_text)
5442 except Exception, err:
5443 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5445 if not isinstance(rdict, dict):
5446 raise errors.OpExecError("Can't parse iallocator results: not a dict")
5448 for key in "success", "info", "nodes":
5449 if key not in rdict:
5450 raise errors.OpExecError("Can't parse iallocator results:"
5451 " missing key '%s'" % key)
5452 setattr(self, key, rdict[key])
5454 if not isinstance(rdict["nodes"], list):
5455 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5457 self.out_data = rdict
5460 class LUTestAllocator(NoHooksLU):
5461 """Run allocator tests.
5463 This LU runs the allocator tests
5466 _OP_REQP = ["direction", "mode", "name"]
5468 def CheckPrereq(self):
5469 """Check prerequisites.
5471 This checks the opcode parameters depending on the director and mode test.
5474 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5475 for attr in ["name", "mem_size", "disks", "disk_template",
5476 "os", "tags", "nics", "vcpus"]:
5477 if not hasattr(self.op, attr):
5478 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5480 iname = self.cfg.ExpandInstanceName(self.op.name)
5481 if iname is not None:
5482 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5484 if not isinstance(self.op.nics, list):
5485 raise errors.OpPrereqError("Invalid parameter 'nics'")
5486 for row in self.op.nics:
5487 if (not isinstance(row, dict) or
5490 "bridge" not in row):
5491 raise errors.OpPrereqError("Invalid contents of the"
5492 " 'nics' parameter")
5493 if not isinstance(self.op.disks, list):
5494 raise errors.OpPrereqError("Invalid parameter 'disks'")
5495 if len(self.op.disks) != 2:
5496 raise errors.OpPrereqError("Only two-disk configurations supported")
5497 for row in self.op.disks:
5498 if (not isinstance(row, dict) or
5499 "size" not in row or
5500 not isinstance(row["size"], int) or
5501 "mode" not in row or
5502 row["mode"] not in ['r', 'w']):
5503 raise errors.OpPrereqError("Invalid contents of the"
5504 " 'disks' parameter")
5505 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5506 if not hasattr(self.op, "name"):
5507 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5508 fname = self.cfg.ExpandInstanceName(self.op.name)
5510 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5512 self.op.name = fname
5513 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5515 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5518 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5519 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5520 raise errors.OpPrereqError("Missing allocator name")
5521 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5522 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5525 def Exec(self, feedback_fn):
5526 """Run the allocator test.
5529 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5530 ial = IAllocator(self,
5533 mem_size=self.op.mem_size,
5534 disks=self.op.disks,
5535 disk_template=self.op.disk_template,
5539 vcpus=self.op.vcpus,
5542 ial = IAllocator(self,
5545 relocate_from=list(self.relocate_from),
5548 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5549 result = ial.in_text
5551 ial.Run(self.op.allocator, validate=False)
5552 result = ial.out_text