4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0201
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
37 from ganeti import ssh
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import serializer
45 from ganeti import ssconf
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement ExpandNames
53 - implement CheckPrereq (except when tasklets are used)
54 - implement Exec (except when tasklets are used)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60 Note that all commands require root permissions.
62 @ivar dry_run_result: the value (if any) that will be returned to the caller
63 in dry-run mode (signalled by opcode dry_run parameter)
71 def __init__(self, processor, op, context, rpc):
72 """Constructor for LogicalUnit.
74 This needs to be overridden in derived classes in order to check op
80 self.cfg = context.cfg
81 self.context = context
83 # Dicts used to declare locking needs to mcpu
84 self.needed_locks = None
85 self.acquired_locks = {}
86 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
88 self.remove_locks = {}
89 # Used to force good behavior when calling helper functions
90 self.recalculate_locks = {}
93 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
94 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
95 self.LogStep = processor.LogStep # pylint: disable-msg=C0103
97 self.dry_run_result = None
98 # support for generic debug attribute
99 if (not hasattr(self.op, "debug_level") or
100 not isinstance(self.op.debug_level, int)):
101 self.op.debug_level = 0
106 for attr_name in self._OP_REQP:
107 attr_val = getattr(op, attr_name, None)
109 raise errors.OpPrereqError("Required parameter '%s' missing" %
110 attr_name, errors.ECODE_INVAL)
112 self.CheckArguments()
115 """Returns the SshRunner object
119 self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
122 ssh = property(fget=__GetSSH)
124 def CheckArguments(self):
125 """Check syntactic validity for the opcode arguments.
127 This method is for doing a simple syntactic check and ensure
128 validity of opcode parameters, without any cluster-related
129 checks. While the same can be accomplished in ExpandNames and/or
130 CheckPrereq, doing these separate is better because:
132 - ExpandNames is left as as purely a lock-related function
133 - CheckPrereq is run after we have acquired locks (and possible
136 The function is allowed to change the self.op attribute so that
137 later methods can no longer worry about missing parameters.
142 def ExpandNames(self):
143 """Expand names for this LU.
145 This method is called before starting to execute the opcode, and it should
146 update all the parameters of the opcode to their canonical form (e.g. a
147 short node name must be fully expanded after this method has successfully
148 completed). This way locking, hooks, logging, ecc. can work correctly.
150 LUs which implement this method must also populate the self.needed_locks
151 member, as a dict with lock levels as keys, and a list of needed lock names
154 - use an empty dict if you don't need any lock
155 - if you don't need any lock at a particular level omit that level
156 - don't put anything for the BGL level
157 - if you want all locks at a level use locking.ALL_SET as a value
159 If you need to share locks (rather than acquire them exclusively) at one
160 level you can modify self.share_locks, setting a true value (usually 1) for
161 that level. By default locks are not shared.
163 This function can also define a list of tasklets, which then will be
164 executed in order instead of the usual LU-level CheckPrereq and Exec
165 functions, if those are not defined by the LU.
169 # Acquire all nodes and one instance
170 self.needed_locks = {
171 locking.LEVEL_NODE: locking.ALL_SET,
172 locking.LEVEL_INSTANCE: ['instance1.example.tld'],
174 # Acquire just two nodes
175 self.needed_locks = {
176 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
179 self.needed_locks = {} # No, you can't leave it to the default value None
182 # The implementation of this method is mandatory only if the new LU is
183 # concurrent, so that old LUs don't need to be changed all at the same
186 self.needed_locks = {} # Exclusive LUs don't need locks.
188 raise NotImplementedError
190 def DeclareLocks(self, level):
191 """Declare LU locking needs for a level
193 While most LUs can just declare their locking needs at ExpandNames time,
194 sometimes there's the need to calculate some locks after having acquired
195 the ones before. This function is called just before acquiring locks at a
196 particular level, but after acquiring the ones at lower levels, and permits
197 such calculations. It can be used to modify self.needed_locks, and by
198 default it does nothing.
200 This function is only called if you have something already set in
201 self.needed_locks for the level.
203 @param level: Locking level which is going to be locked
204 @type level: member of ganeti.locking.LEVELS
208 def CheckPrereq(self):
209 """Check prerequisites for this LU.
211 This method should check that the prerequisites for the execution
212 of this LU are fulfilled. It can do internode communication, but
213 it should be idempotent - no cluster or system changes are
216 The method should raise errors.OpPrereqError in case something is
217 not fulfilled. Its return value is ignored.
219 This method should also update all the parameters of the opcode to
220 their canonical form if it hasn't been done by ExpandNames before.
223 if self.tasklets is not None:
224 for (idx, tl) in enumerate(self.tasklets):
225 logging.debug("Checking prerequisites for tasklet %s/%s",
226 idx + 1, len(self.tasklets))
229 raise NotImplementedError
231 def Exec(self, feedback_fn):
234 This method should implement the actual work. It should raise
235 errors.OpExecError for failures that are somewhat dealt with in
239 if self.tasklets is not None:
240 for (idx, tl) in enumerate(self.tasklets):
241 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
244 raise NotImplementedError
246 def BuildHooksEnv(self):
247 """Build hooks environment for this LU.
249 This method should return a three-node tuple consisting of: a dict
250 containing the environment that will be used for running the
251 specific hook for this LU, a list of node names on which the hook
252 should run before the execution, and a list of node names on which
253 the hook should run after the execution.
255 The keys of the dict must not have 'GANETI_' prefixed as this will
256 be handled in the hooks runner. Also note additional keys will be
257 added by the hooks runner. If the LU doesn't define any
258 environment, an empty dict (and not None) should be returned.
260 No nodes should be returned as an empty list (and not None).
262 Note that if the HPATH for a LU class is None, this function will
266 raise NotImplementedError
268 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
269 """Notify the LU about the results of its hooks.
271 This method is called every time a hooks phase is executed, and notifies
272 the Logical Unit about the hooks' result. The LU can then use it to alter
273 its result based on the hooks. By default the method does nothing and the
274 previous result is passed back unchanged but any LU can define it if it
275 wants to use the local cluster hook-scripts somehow.
277 @param phase: one of L{constants.HOOKS_PHASE_POST} or
278 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
279 @param hook_results: the results of the multi-node hooks rpc call
280 @param feedback_fn: function used send feedback back to the caller
281 @param lu_result: the previous Exec result this LU had, or None
283 @return: the new Exec result, based on the previous result
287 # API must be kept, thus we ignore the unused argument and could
288 # be a function warnings
289 # pylint: disable-msg=W0613,R0201
292 def _ExpandAndLockInstance(self):
293 """Helper function to expand and lock an instance.
295 Many LUs that work on an instance take its name in self.op.instance_name
296 and need to expand it and then declare the expanded name for locking. This
297 function does it, and then updates self.op.instance_name to the expanded
298 name. It also initializes needed_locks as a dict, if this hasn't been done
302 if self.needed_locks is None:
303 self.needed_locks = {}
305 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
306 "_ExpandAndLockInstance called with instance-level locks set"
307 self.op.instance_name = _ExpandInstanceName(self.cfg,
308 self.op.instance_name)
309 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
311 def _LockInstancesNodes(self, primary_only=False):
312 """Helper function to declare instances' nodes for locking.
314 This function should be called after locking one or more instances to lock
315 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
316 with all primary or secondary nodes for instances already locked and
317 present in self.needed_locks[locking.LEVEL_INSTANCE].
319 It should be called from DeclareLocks, and for safety only works if
320 self.recalculate_locks[locking.LEVEL_NODE] is set.
322 In the future it may grow parameters to just lock some instance's nodes, or
323 to just lock primaries or secondary nodes, if needed.
325 If should be called in DeclareLocks in a way similar to::
327 if level == locking.LEVEL_NODE:
328 self._LockInstancesNodes()
330 @type primary_only: boolean
331 @param primary_only: only lock primary nodes of locked instances
334 assert locking.LEVEL_NODE in self.recalculate_locks, \
335 "_LockInstancesNodes helper function called with no nodes to recalculate"
337 # TODO: check if we're really been called with the instance locks held
339 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
340 # future we might want to have different behaviors depending on the value
341 # of self.recalculate_locks[locking.LEVEL_NODE]
343 for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
344 instance = self.context.cfg.GetInstanceInfo(instance_name)
345 wanted_nodes.append(instance.primary_node)
347 wanted_nodes.extend(instance.secondary_nodes)
349 if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
350 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
351 elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
352 self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
354 del self.recalculate_locks[locking.LEVEL_NODE]
357 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
358 """Simple LU which runs no hooks.
360 This LU is intended as a parent for other LogicalUnits which will
361 run no hooks, in order to reduce duplicate code.
367 def BuildHooksEnv(self):
368 """Empty BuildHooksEnv for NoHooksLu.
370 This just raises an error.
373 assert False, "BuildHooksEnv called for NoHooksLUs"
377 """Tasklet base class.
379 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
380 they can mix legacy code with tasklets. Locking needs to be done in the LU,
381 tasklets know nothing about locks.
383 Subclasses must follow these rules:
384 - Implement CheckPrereq
388 def __init__(self, lu):
395 def CheckPrereq(self):
396 """Check prerequisites for this tasklets.
398 This method should check whether the prerequisites for the execution of
399 this tasklet are fulfilled. It can do internode communication, but it
400 should be idempotent - no cluster or system changes are allowed.
402 The method should raise errors.OpPrereqError in case something is not
403 fulfilled. Its return value is ignored.
405 This method should also update all parameters to their canonical form if it
406 hasn't been done before.
409 raise NotImplementedError
411 def Exec(self, feedback_fn):
412 """Execute the tasklet.
414 This method should implement the actual work. It should raise
415 errors.OpExecError for failures that are somewhat dealt with in code, or
419 raise NotImplementedError
422 def _GetWantedNodes(lu, nodes):
423 """Returns list of checked and expanded node names.
425 @type lu: L{LogicalUnit}
426 @param lu: the logical unit on whose behalf we execute
428 @param nodes: list of node names or None for all nodes
430 @return: the list of nodes, sorted
431 @raise errors.ProgrammerError: if the nodes parameter is wrong type
434 if not isinstance(nodes, list):
435 raise errors.OpPrereqError("Invalid argument type 'nodes'",
439 raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
440 " non-empty list of nodes whose name is to be expanded.")
442 wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
443 return utils.NiceSort(wanted)
446 def _GetWantedInstances(lu, instances):
447 """Returns list of checked and expanded instance names.
449 @type lu: L{LogicalUnit}
450 @param lu: the logical unit on whose behalf we execute
451 @type instances: list
452 @param instances: list of instance names or None for all instances
454 @return: the list of instances, sorted
455 @raise errors.OpPrereqError: if the instances parameter is wrong type
456 @raise errors.OpPrereqError: if any of the passed instances is not found
459 if not isinstance(instances, list):
460 raise errors.OpPrereqError("Invalid argument type 'instances'",
464 wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
466 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
470 def _CheckOutputFields(static, dynamic, selected):
471 """Checks whether all selected fields are valid.
473 @type static: L{utils.FieldSet}
474 @param static: static fields set
475 @type dynamic: L{utils.FieldSet}
476 @param dynamic: dynamic fields set
483 delta = f.NonMatching(selected)
485 raise errors.OpPrereqError("Unknown output fields selected: %s"
486 % ",".join(delta), errors.ECODE_INVAL)
489 def _CheckBooleanOpField(op, name):
490 """Validates boolean opcode parameters.
492 This will ensure that an opcode parameter is either a boolean value,
493 or None (but that it always exists).
496 val = getattr(op, name, None)
497 if not (val is None or isinstance(val, bool)):
498 raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
499 (name, str(val)), errors.ECODE_INVAL)
500 setattr(op, name, val)
503 def _CheckGlobalHvParams(params):
504 """Validates that given hypervisor params are not global ones.
506 This will ensure that instances don't get customised versions of
510 used_globals = constants.HVC_GLOBALS.intersection(params)
512 msg = ("The following hypervisor parameters are global and cannot"
513 " be customized at instance level, please modify them at"
514 " cluster level: %s" % utils.CommaJoin(used_globals))
515 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
518 def _CheckNodeOnline(lu, node):
519 """Ensure that a given node is online.
521 @param lu: the LU on behalf of which we make the check
522 @param node: the node to check
523 @raise errors.OpPrereqError: if the node is offline
526 if lu.cfg.GetNodeInfo(node).offline:
527 raise errors.OpPrereqError("Can't use offline node %s" % node,
531 def _CheckNodeNotDrained(lu, node):
532 """Ensure that a given node is not drained.
534 @param lu: the LU on behalf of which we make the check
535 @param node: the node to check
536 @raise errors.OpPrereqError: if the node is drained
539 if lu.cfg.GetNodeInfo(node).drained:
540 raise errors.OpPrereqError("Can't use drained node %s" % node,
544 def _ExpandItemName(fn, name, kind):
545 """Expand an item name.
547 @param fn: the function to use for expansion
548 @param name: requested item name
549 @param kind: text description ('Node' or 'Instance')
550 @return: the resolved (full) name
551 @raise errors.OpPrereqError: if the item is not found
555 if full_name is None:
556 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
561 def _ExpandNodeName(cfg, name):
562 """Wrapper over L{_ExpandItemName} for nodes."""
563 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
566 def _ExpandInstanceName(cfg, name):
567 """Wrapper over L{_ExpandItemName} for instance."""
568 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
571 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
572 memory, vcpus, nics, disk_template, disks,
573 bep, hvp, hypervisor_name):
574 """Builds instance related env variables for hooks
576 This builds the hook environment from individual variables.
579 @param name: the name of the instance
580 @type primary_node: string
581 @param primary_node: the name of the instance's primary node
582 @type secondary_nodes: list
583 @param secondary_nodes: list of secondary nodes as strings
584 @type os_type: string
585 @param os_type: the name of the instance's OS
586 @type status: boolean
587 @param status: the should_run status of the instance
589 @param memory: the memory size of the instance
591 @param vcpus: the count of VCPUs the instance has
593 @param nics: list of tuples (ip, mac, mode, link) representing
594 the NICs the instance has
595 @type disk_template: string
596 @param disk_template: the disk template of the instance
598 @param disks: the list of (size, mode) pairs
600 @param bep: the backend parameters for the instance
602 @param hvp: the hypervisor parameters for the instance
603 @type hypervisor_name: string
604 @param hypervisor_name: the hypervisor for the instance
606 @return: the hook environment for this instance
615 "INSTANCE_NAME": name,
616 "INSTANCE_PRIMARY": primary_node,
617 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
618 "INSTANCE_OS_TYPE": os_type,
619 "INSTANCE_STATUS": str_status,
620 "INSTANCE_MEMORY": memory,
621 "INSTANCE_VCPUS": vcpus,
622 "INSTANCE_DISK_TEMPLATE": disk_template,
623 "INSTANCE_HYPERVISOR": hypervisor_name,
627 nic_count = len(nics)
628 for idx, (ip, mac, mode, link) in enumerate(nics):
631 env["INSTANCE_NIC%d_IP" % idx] = ip
632 env["INSTANCE_NIC%d_MAC" % idx] = mac
633 env["INSTANCE_NIC%d_MODE" % idx] = mode
634 env["INSTANCE_NIC%d_LINK" % idx] = link
635 if mode == constants.NIC_MODE_BRIDGED:
636 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
640 env["INSTANCE_NIC_COUNT"] = nic_count
643 disk_count = len(disks)
644 for idx, (size, mode) in enumerate(disks):
645 env["INSTANCE_DISK%d_SIZE" % idx] = size
646 env["INSTANCE_DISK%d_MODE" % idx] = mode
650 env["INSTANCE_DISK_COUNT"] = disk_count
652 for source, kind in [(bep, "BE"), (hvp, "HV")]:
653 for key, value in source.items():
654 env["INSTANCE_%s_%s" % (kind, key)] = value
659 def _NICListToTuple(lu, nics):
660 """Build a list of nic information tuples.
662 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
663 value in LUQueryInstanceData.
665 @type lu: L{LogicalUnit}
666 @param lu: the logical unit on whose behalf we execute
667 @type nics: list of L{objects.NIC}
668 @param nics: list of nics to convert to hooks tuples
672 c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
676 filled_params = objects.FillDict(c_nicparams, nic.nicparams)
677 mode = filled_params[constants.NIC_MODE]
678 link = filled_params[constants.NIC_LINK]
679 hooks_nics.append((ip, mac, mode, link))
683 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
684 """Builds instance related env variables for hooks from an object.
686 @type lu: L{LogicalUnit}
687 @param lu: the logical unit on whose behalf we execute
688 @type instance: L{objects.Instance}
689 @param instance: the instance for which we should build the
692 @param override: dictionary with key/values that will override
695 @return: the hook environment dictionary
698 cluster = lu.cfg.GetClusterInfo()
699 bep = cluster.FillBE(instance)
700 hvp = cluster.FillHV(instance)
702 'name': instance.name,
703 'primary_node': instance.primary_node,
704 'secondary_nodes': instance.secondary_nodes,
705 'os_type': instance.os,
706 'status': instance.admin_up,
707 'memory': bep[constants.BE_MEMORY],
708 'vcpus': bep[constants.BE_VCPUS],
709 'nics': _NICListToTuple(lu, instance.nics),
710 'disk_template': instance.disk_template,
711 'disks': [(disk.size, disk.mode) for disk in instance.disks],
714 'hypervisor_name': instance.hypervisor,
717 args.update(override)
718 return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142
721 def _AdjustCandidatePool(lu, exceptions):
722 """Adjust the candidate pool after node operations.
725 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
727 lu.LogInfo("Promoted nodes to master candidate role: %s",
728 utils.CommaJoin(node.name for node in mod_list))
729 for name in mod_list:
730 lu.context.ReaddNode(name)
731 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
733 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
737 def _DecideSelfPromotion(lu, exceptions=None):
738 """Decide whether I should promote myself as a master candidate.
741 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
742 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
743 # the new node will increase mc_max with one, so:
744 mc_should = min(mc_should + 1, cp_size)
745 return mc_now < mc_should
748 def _CheckNicsBridgesExist(lu, target_nics, target_node,
749 profile=constants.PP_DEFAULT):
750 """Check that the brigdes needed by a list of nics exist.
753 c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
754 paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
755 for nic in target_nics]
756 brlist = [params[constants.NIC_LINK] for params in paramslist
757 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
759 result = lu.rpc.call_bridges_exist(target_node, brlist)
760 result.Raise("Error checking bridges on destination node '%s'" %
761 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
764 def _CheckInstanceBridgesExist(lu, instance, node=None):
765 """Check that the brigdes needed by an instance exist.
769 node = instance.primary_node
770 _CheckNicsBridgesExist(lu, instance.nics, node)
773 def _CheckOSVariant(os_obj, name):
774 """Check whether an OS name conforms to the os variants specification.
776 @type os_obj: L{objects.OS}
777 @param os_obj: OS object to check
779 @param name: OS name passed by the user, to check for validity
782 if not os_obj.supported_variants:
785 variant = name.split("+", 1)[1]
787 raise errors.OpPrereqError("OS name must include a variant",
790 if variant not in os_obj.supported_variants:
791 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
794 def _GetNodeInstancesInner(cfg, fn):
795 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
798 def _GetNodeInstances(cfg, node_name):
799 """Returns a list of all primary and secondary instances on a node.
803 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
806 def _GetNodePrimaryInstances(cfg, node_name):
807 """Returns primary instances on a node.
810 return _GetNodeInstancesInner(cfg,
811 lambda inst: node_name == inst.primary_node)
814 def _GetNodeSecondaryInstances(cfg, node_name):
815 """Returns secondary instances on a node.
818 return _GetNodeInstancesInner(cfg,
819 lambda inst: node_name in inst.secondary_nodes)
822 def _GetStorageTypeArgs(cfg, storage_type):
823 """Returns the arguments for a storage type.
826 # Special case for file storage
827 if storage_type == constants.ST_FILE:
828 # storage.FileStorage wants a list of storage directories
829 return [[cfg.GetFileStorageDir()]]
834 def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
837 for dev in instance.disks:
838 cfg.SetDiskID(dev, node_name)
840 result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
841 result.Raise("Failed to get disk status from node %s" % node_name,
842 prereq=prereq, ecode=errors.ECODE_ENVIRON)
844 for idx, bdev_status in enumerate(result.payload):
845 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
851 class LUPostInitCluster(LogicalUnit):
852 """Logical unit for running hooks after cluster initialization.
855 HPATH = "cluster-init"
856 HTYPE = constants.HTYPE_CLUSTER
859 def BuildHooksEnv(self):
863 env = {"OP_TARGET": self.cfg.GetClusterName()}
864 mn = self.cfg.GetMasterNode()
867 def CheckPrereq(self):
868 """No prerequisites to check.
873 def Exec(self, feedback_fn):
880 class LUDestroyCluster(LogicalUnit):
881 """Logical unit for destroying the cluster.
884 HPATH = "cluster-destroy"
885 HTYPE = constants.HTYPE_CLUSTER
888 def BuildHooksEnv(self):
892 env = {"OP_TARGET": self.cfg.GetClusterName()}
895 def CheckPrereq(self):
896 """Check prerequisites.
898 This checks whether the cluster is empty.
900 Any errors are signaled by raising errors.OpPrereqError.
903 master = self.cfg.GetMasterNode()
905 nodelist = self.cfg.GetNodeList()
906 if len(nodelist) != 1 or nodelist[0] != master:
907 raise errors.OpPrereqError("There are still %d node(s) in"
908 " this cluster." % (len(nodelist) - 1),
910 instancelist = self.cfg.GetInstanceList()
912 raise errors.OpPrereqError("There are still %d instance(s) in"
913 " this cluster." % len(instancelist),
916 def Exec(self, feedback_fn):
917 """Destroys the cluster.
920 master = self.cfg.GetMasterNode()
921 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
923 # Run post hooks on master node before it's removed
924 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
926 hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
928 # pylint: disable-msg=W0702
929 self.LogWarning("Errors occurred running hooks on %s" % master)
931 result = self.rpc.call_node_stop_master(master, False)
932 result.Raise("Could not disable the master role")
935 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
936 utils.CreateBackup(priv_key)
937 utils.CreateBackup(pub_key)
942 class LUVerifyCluster(LogicalUnit):
943 """Verifies the cluster status.
946 HPATH = "cluster-verify"
947 HTYPE = constants.HTYPE_CLUSTER
948 _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"]
953 TINSTANCE = "instance"
955 ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
956 EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
957 EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
958 EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
959 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
960 EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
961 EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
962 ENODEDRBD = (TNODE, "ENODEDRBD")
963 ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
964 ENODEHOOKS = (TNODE, "ENODEHOOKS")
965 ENODEHV = (TNODE, "ENODEHV")
966 ENODELVM = (TNODE, "ENODELVM")
967 ENODEN1 = (TNODE, "ENODEN1")
968 ENODENET = (TNODE, "ENODENET")
969 ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE")
970 ENODEORPHANLV = (TNODE, "ENODEORPHANLV")
971 ENODERPC = (TNODE, "ENODERPC")
972 ENODESSH = (TNODE, "ENODESSH")
973 ENODEVERSION = (TNODE, "ENODEVERSION")
974 ENODESETUP = (TNODE, "ENODESETUP")
975 ENODETIME = (TNODE, "ENODETIME")
978 ETYPE_ERROR = "ERROR"
979 ETYPE_WARNING = "WARNING"
981 def ExpandNames(self):
982 self.needed_locks = {
983 locking.LEVEL_NODE: locking.ALL_SET,
984 locking.LEVEL_INSTANCE: locking.ALL_SET,
986 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
988 def _Error(self, ecode, item, msg, *args, **kwargs):
989 """Format an error message.
991 Based on the opcode's error_codes parameter, either format a
992 parseable error code, or a simpler error string.
994 This must be called only from Exec and functions called from Exec.
997 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
999 # first complete the msg
1002 # then format the whole message
1003 if self.op.error_codes:
1004 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1010 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1011 # and finally report it via the feedback_fn
1012 self._feedback_fn(" - %s" % msg)
1014 def _ErrorIf(self, cond, *args, **kwargs):
1015 """Log an error message if the passed condition is True.
1018 cond = bool(cond) or self.op.debug_simulate_errors
1020 self._Error(*args, **kwargs)
1021 # do not mark the operation as failed for WARN cases only
1022 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1023 self.bad = self.bad or cond
1025 def _VerifyNode(self, nodeinfo, file_list, local_cksum,
1026 node_result, master_files, drbd_map, vg_name):
1027 """Run multiple tests against a node.
1031 - compares ganeti version
1032 - checks vg existence and size > 20G
1033 - checks config file checksum
1034 - checks ssh to other nodes
1036 @type nodeinfo: L{objects.Node}
1037 @param nodeinfo: the node to check
1038 @param file_list: required list of files
1039 @param local_cksum: dictionary of local files and their checksums
1040 @param node_result: the results from the node
1041 @param master_files: list of files that only masters should have
1042 @param drbd_map: the useddrbd minors for this node, in
1043 form of minor: (instance, must_exist) which correspond to instances
1044 and their running status
1045 @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
1048 node = nodeinfo.name
1049 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1051 # main result, node_result should be a non-empty dict
1052 test = not node_result or not isinstance(node_result, dict)
1053 _ErrorIf(test, self.ENODERPC, node,
1054 "unable to verify node: no data returned")
1058 # compares ganeti version
1059 local_version = constants.PROTOCOL_VERSION
1060 remote_version = node_result.get('version', None)
1061 test = not (remote_version and
1062 isinstance(remote_version, (list, tuple)) and
1063 len(remote_version) == 2)
1064 _ErrorIf(test, self.ENODERPC, node,
1065 "connection to node returned invalid data")
1069 test = local_version != remote_version[0]
1070 _ErrorIf(test, self.ENODEVERSION, node,
1071 "incompatible protocol versions: master %s,"
1072 " node %s", local_version, remote_version[0])
1076 # node seems compatible, we can actually try to look into its results
1078 # full package version
1079 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1080 self.ENODEVERSION, node,
1081 "software version mismatch: master %s, node %s",
1082 constants.RELEASE_VERSION, remote_version[1],
1083 code=self.ETYPE_WARNING)
1085 # checks vg existence and size > 20G
1086 if vg_name is not None:
1087 vglist = node_result.get(constants.NV_VGLIST, None)
1089 _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
1091 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1092 constants.MIN_VG_SIZE)
1093 _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
1095 # checks config file checksum
1097 remote_cksum = node_result.get(constants.NV_FILELIST, None)
1098 test = not isinstance(remote_cksum, dict)
1099 _ErrorIf(test, self.ENODEFILECHECK, node,
1100 "node hasn't returned file checksum data")
1102 for file_name in file_list:
1103 node_is_mc = nodeinfo.master_candidate
1104 must_have = (file_name not in master_files) or node_is_mc
1106 test1 = file_name not in remote_cksum
1108 test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
1110 test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
1111 _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
1112 "file '%s' missing", file_name)
1113 _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
1114 "file '%s' has wrong checksum", file_name)
1115 # not candidate and this is not a must-have file
1116 _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
1117 "file '%s' should not exist on non master"
1118 " candidates (and the file is outdated)", file_name)
1119 # all good, except non-master/non-must have combination
1120 _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
1121 "file '%s' should not exist"
1122 " on non master candidates", file_name)
1126 test = constants.NV_NODELIST not in node_result
1127 _ErrorIf(test, self.ENODESSH, node,
1128 "node hasn't returned node ssh connectivity data")
1130 if node_result[constants.NV_NODELIST]:
1131 for a_node, a_msg in node_result[constants.NV_NODELIST].items():
1132 _ErrorIf(True, self.ENODESSH, node,
1133 "ssh communication with node '%s': %s", a_node, a_msg)
1135 test = constants.NV_NODENETTEST not in node_result
1136 _ErrorIf(test, self.ENODENET, node,
1137 "node hasn't returned node tcp connectivity data")
1139 if node_result[constants.NV_NODENETTEST]:
1140 nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
1142 _ErrorIf(True, self.ENODENET, node,
1143 "tcp communication with node '%s': %s",
1144 anode, node_result[constants.NV_NODENETTEST][anode])
1146 hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
1147 if isinstance(hyp_result, dict):
1148 for hv_name, hv_result in hyp_result.iteritems():
1149 test = hv_result is not None
1150 _ErrorIf(test, self.ENODEHV, node,
1151 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1153 # check used drbd list
1154 if vg_name is not None:
1155 used_minors = node_result.get(constants.NV_DRBDLIST, [])
1156 test = not isinstance(used_minors, (tuple, list))
1157 _ErrorIf(test, self.ENODEDRBD, node,
1158 "cannot parse drbd status file: %s", str(used_minors))
1160 for minor, (iname, must_exist) in drbd_map.items():
1161 test = minor not in used_minors and must_exist
1162 _ErrorIf(test, self.ENODEDRBD, node,
1163 "drbd minor %d of instance %s is not active",
1165 for minor in used_minors:
1166 test = minor not in drbd_map
1167 _ErrorIf(test, self.ENODEDRBD, node,
1168 "unallocated drbd minor %d is in use", minor)
1169 test = node_result.get(constants.NV_NODESETUP,
1170 ["Missing NODESETUP results"])
1171 _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
1175 if vg_name is not None:
1176 pvlist = node_result.get(constants.NV_PVLIST, None)
1177 test = pvlist is None
1178 _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
1180 # check that ':' is not present in PV names, since it's a
1181 # special character for lvcreate (denotes the range of PEs to
1183 for _, pvname, owner_vg in pvlist:
1184 test = ":" in pvname
1185 _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
1186 " '%s' of VG '%s'", pvname, owner_vg)
1188 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
1189 node_instance, n_offline):
1190 """Verify an instance.
1192 This function checks to see if the required block devices are
1193 available on the instance's node.
1196 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1197 node_current = instanceconfig.primary_node
1199 node_vol_should = {}
1200 instanceconfig.MapLVsByNode(node_vol_should)
1202 for node in node_vol_should:
1203 if node in n_offline:
1204 # ignore missing volumes on offline nodes
1206 for volume in node_vol_should[node]:
1207 test = node not in node_vol_is or volume not in node_vol_is[node]
1208 _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
1209 "volume %s missing on node %s", volume, node)
1211 if instanceconfig.admin_up:
1212 test = ((node_current not in node_instance or
1213 not instance in node_instance[node_current]) and
1214 node_current not in n_offline)
1215 _ErrorIf(test, self.EINSTANCEDOWN, instance,
1216 "instance not running on its primary node %s",
1219 for node in node_instance:
1220 if (not node == node_current):
1221 test = instance in node_instance[node]
1222 _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
1223 "instance should not run on node %s", node)
1225 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
1226 """Verify if there are any unknown volumes in the cluster.
1228 The .os, .swap and backup volumes are ignored. All other volumes are
1229 reported as unknown.
1232 for node in node_vol_is:
1233 for volume in node_vol_is[node]:
1234 test = (node not in node_vol_should or
1235 volume not in node_vol_should[node])
1236 self._ErrorIf(test, self.ENODEORPHANLV, node,
1237 "volume %s is unknown", volume)
1239 def _VerifyOrphanInstances(self, instancelist, node_instance):
1240 """Verify the list of running instances.
1242 This checks what instances are running but unknown to the cluster.
1245 for node in node_instance:
1246 for o_inst in node_instance[node]:
1247 test = o_inst not in instancelist
1248 self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
1249 "instance %s on node %s should not exist", o_inst, node)
1251 def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
1252 """Verify N+1 Memory Resilience.
1254 Check that if one single node dies we can still start all the instances it
1258 for node, nodeinfo in node_info.iteritems():
1259 # This code checks that every node which is now listed as secondary has
1260 # enough memory to host all instances it is supposed to should a single
1261 # other node in the cluster fail.
1262 # FIXME: not ready for failover to an arbitrary node
1263 # FIXME: does not support file-backed instances
1264 # WARNING: we currently take into account down instances as well as up
1265 # ones, considering that even if they're down someone might want to start
1266 # them even in the event of a node failure.
1267 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1269 for instance in instances:
1270 bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1271 if bep[constants.BE_AUTO_BALANCE]:
1272 needed_mem += bep[constants.BE_MEMORY]
1273 test = nodeinfo['mfree'] < needed_mem
1274 self._ErrorIf(test, self.ENODEN1, node,
1275 "not enough memory on to accommodate"
1276 " failovers should peer node %s fail", prinode)
1278 def CheckPrereq(self):
1279 """Check prerequisites.
1281 Transform the list of checks we're going to skip into a set and check that
1282 all its members are valid.
1285 self.skip_set = frozenset(self.op.skip_checks)
1286 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1287 raise errors.OpPrereqError("Invalid checks to be skipped specified",
1290 def BuildHooksEnv(self):
1293 Cluster-Verify hooks just ran in the post phase and their failure makes
1294 the output be logged in the verify output and the verification to fail.
1297 all_nodes = self.cfg.GetNodeList()
1299 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1301 for node in self.cfg.GetAllNodesInfo().values():
1302 env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1304 return env, [], all_nodes
1306 def Exec(self, feedback_fn):
1307 """Verify integrity of cluster, performing various test on nodes.
1311 _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
1312 verbose = self.op.verbose
1313 self._feedback_fn = feedback_fn
1314 feedback_fn("* Verifying global settings")
1315 for msg in self.cfg.VerifyConfig():
1316 _ErrorIf(True, self.ECLUSTERCFG, None, msg)
1318 vg_name = self.cfg.GetVGName()
1319 hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1320 nodelist = utils.NiceSort(self.cfg.GetNodeList())
1321 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1322 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1323 instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1324 for iname in instancelist)
1325 i_non_redundant = [] # Non redundant instances
1326 i_non_a_balanced = [] # Non auto-balanced instances
1327 n_offline = [] # List of offline nodes
1328 n_drained = [] # List of nodes being drained
1334 # FIXME: verify OS list
1335 # do local checksums
1336 master_files = [constants.CLUSTER_CONF_FILE]
1338 file_names = ssconf.SimpleStore().GetFileList()
1339 file_names.append(constants.SSL_CERT_FILE)
1340 file_names.append(constants.RAPI_CERT_FILE)
1341 file_names.extend(master_files)
1343 local_checksums = utils.FingerprintFiles(file_names)
1345 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1346 node_verify_param = {
1347 constants.NV_FILELIST: file_names,
1348 constants.NV_NODELIST: [node.name for node in nodeinfo
1349 if not node.offline],
1350 constants.NV_HYPERVISOR: hypervisors,
1351 constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1352 node.secondary_ip) for node in nodeinfo
1353 if not node.offline],
1354 constants.NV_INSTANCELIST: hypervisors,
1355 constants.NV_VERSION: None,
1356 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1357 constants.NV_NODESETUP: None,
1358 constants.NV_TIME: None,
1361 if vg_name is not None:
1362 node_verify_param[constants.NV_VGLIST] = None
1363 node_verify_param[constants.NV_LVLIST] = vg_name
1364 node_verify_param[constants.NV_PVLIST] = [vg_name]
1365 node_verify_param[constants.NV_DRBDLIST] = None
1367 # Due to the way our RPC system works, exact response times cannot be
1368 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
1369 # time before and after executing the request, we can at least have a time
1371 nvinfo_starttime = time.time()
1372 all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1373 self.cfg.GetClusterName())
1374 nvinfo_endtime = time.time()
1376 cluster = self.cfg.GetClusterInfo()
1377 master_node = self.cfg.GetMasterNode()
1378 all_drbd_map = self.cfg.ComputeDRBDMap()
1380 feedback_fn("* Verifying node status")
1381 for node_i in nodeinfo:
1386 feedback_fn("* Skipping offline node %s" % (node,))
1387 n_offline.append(node)
1390 if node == master_node:
1392 elif node_i.master_candidate:
1393 ntype = "master candidate"
1394 elif node_i.drained:
1396 n_drained.append(node)
1400 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1402 msg = all_nvinfo[node].fail_msg
1403 _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
1407 nresult = all_nvinfo[node].payload
1409 for minor, instance in all_drbd_map[node].items():
1410 test = instance not in instanceinfo
1411 _ErrorIf(test, self.ECLUSTERCFG, None,
1412 "ghost instance '%s' in temporary DRBD map", instance)
1413 # ghost instance should not be running, but otherwise we
1414 # don't give double warnings (both ghost instance and
1415 # unallocated minor in use)
1417 node_drbd[minor] = (instance, False)
1419 instance = instanceinfo[instance]
1420 node_drbd[minor] = (instance.name, instance.admin_up)
1422 self._VerifyNode(node_i, file_names, local_checksums,
1423 nresult, master_files, node_drbd, vg_name)
1425 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1427 node_volume[node] = {}
1428 elif isinstance(lvdata, basestring):
1429 _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
1430 utils.SafeEncode(lvdata))
1431 node_volume[node] = {}
1432 elif not isinstance(lvdata, dict):
1433 _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
1436 node_volume[node] = lvdata
1439 idata = nresult.get(constants.NV_INSTANCELIST, None)
1440 test = not isinstance(idata, list)
1441 _ErrorIf(test, self.ENODEHV, node,
1442 "rpc call to node failed (instancelist)")
1446 node_instance[node] = idata
1449 nodeinfo = nresult.get(constants.NV_HVINFO, None)
1450 test = not isinstance(nodeinfo, dict)
1451 _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
1456 ntime = nresult.get(constants.NV_TIME, None)
1458 ntime_merged = utils.MergeTime(ntime)
1459 except (ValueError, TypeError):
1460 _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
1462 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1463 ntime_diff = abs(nvinfo_starttime - ntime_merged)
1464 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1465 ntime_diff = abs(ntime_merged - nvinfo_endtime)
1469 _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
1470 "Node time diverges by at least %0.1fs from master node time",
1473 if ntime_diff is not None:
1478 "mfree": int(nodeinfo['memory_free']),
1481 # dictionary holding all instances this node is secondary for,
1482 # grouped by their primary node. Each key is a cluster node, and each
1483 # value is a list of instances which have the key as primary and the
1484 # current node as secondary. this is handy to calculate N+1 memory
1485 # availability if you can only failover from a primary to its
1487 "sinst-by-pnode": {},
1489 # FIXME: devise a free space model for file based instances as well
1490 if vg_name is not None:
1491 test = (constants.NV_VGLIST not in nresult or
1492 vg_name not in nresult[constants.NV_VGLIST])
1493 _ErrorIf(test, self.ENODELVM, node,
1494 "node didn't return data for the volume group '%s'"
1495 " - it is either missing or broken", vg_name)
1498 node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1499 except (ValueError, KeyError):
1500 _ErrorIf(True, self.ENODERPC, node,
1501 "node returned invalid nodeinfo, check lvm/hypervisor")
1504 node_vol_should = {}
1506 feedback_fn("* Verifying instance status")
1507 for instance in instancelist:
1509 feedback_fn("* Verifying instance %s" % instance)
1510 inst_config = instanceinfo[instance]
1511 self._VerifyInstance(instance, inst_config, node_volume,
1512 node_instance, n_offline)
1513 inst_nodes_offline = []
1515 inst_config.MapLVsByNode(node_vol_should)
1517 instance_cfg[instance] = inst_config
1519 pnode = inst_config.primary_node
1520 _ErrorIf(pnode not in node_info and pnode not in n_offline,
1521 self.ENODERPC, pnode, "instance %s, connection to"
1522 " primary node failed", instance)
1523 if pnode in node_info:
1524 node_info[pnode]['pinst'].append(instance)
1526 if pnode in n_offline:
1527 inst_nodes_offline.append(pnode)
1529 # If the instance is non-redundant we cannot survive losing its primary
1530 # node, so we are not N+1 compliant. On the other hand we have no disk
1531 # templates with more than one secondary so that situation is not well
1533 # FIXME: does not support file-backed instances
1534 if len(inst_config.secondary_nodes) == 0:
1535 i_non_redundant.append(instance)
1536 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1537 self.EINSTANCELAYOUT, instance,
1538 "instance has multiple secondary nodes", code="WARNING")
1540 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1541 i_non_a_balanced.append(instance)
1543 for snode in inst_config.secondary_nodes:
1544 _ErrorIf(snode not in node_info and snode not in n_offline,
1545 self.ENODERPC, snode,
1546 "instance %s, connection to secondary node"
1549 if snode in node_info:
1550 node_info[snode]['sinst'].append(instance)
1551 if pnode not in node_info[snode]['sinst-by-pnode']:
1552 node_info[snode]['sinst-by-pnode'][pnode] = []
1553 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1555 if snode in n_offline:
1556 inst_nodes_offline.append(snode)
1558 # warn that the instance lives on offline nodes
1559 _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
1560 "instance lives on offline node(s) %s",
1561 utils.CommaJoin(inst_nodes_offline))
1563 feedback_fn("* Verifying orphan volumes")
1564 self._VerifyOrphanVolumes(node_vol_should, node_volume)
1566 feedback_fn("* Verifying remaining instances")
1567 self._VerifyOrphanInstances(instancelist, node_instance)
1569 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1570 feedback_fn("* Verifying N+1 Memory redundancy")
1571 self._VerifyNPlusOneMemory(node_info, instance_cfg)
1573 feedback_fn("* Other Notes")
1575 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
1576 % len(i_non_redundant))
1578 if i_non_a_balanced:
1579 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
1580 % len(i_non_a_balanced))
1583 feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline))
1586 feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained))
1590 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1591 """Analyze the post-hooks' result
1593 This method analyses the hook result, handles it, and sends some
1594 nicely-formatted feedback back to the user.
1596 @param phase: one of L{constants.HOOKS_PHASE_POST} or
1597 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1598 @param hooks_results: the results of the multi-node hooks rpc call
1599 @param feedback_fn: function used send feedback back to the caller
1600 @param lu_result: previous Exec result
1601 @return: the new Exec result, based on the previous result
1605 # We only really run POST phase hooks, and are only interested in
1607 if phase == constants.HOOKS_PHASE_POST:
1608 # Used to change hooks' output to proper indentation
1609 indent_re = re.compile('^', re.M)
1610 feedback_fn("* Hooks Results")
1611 assert hooks_results, "invalid result from hooks"
1613 for node_name in hooks_results:
1614 res = hooks_results[node_name]
1616 test = msg and not res.offline
1617 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1618 "Communication failure in hooks execution: %s", msg)
1619 if res.offline or msg:
1620 # No need to investigate payload if node is offline or gave an error.
1621 # override manually lu_result here as _ErrorIf only
1622 # overrides self.bad
1625 for script, hkr, output in res.payload:
1626 test = hkr == constants.HKR_FAIL
1627 self._ErrorIf(test, self.ENODEHOOKS, node_name,
1628 "Script %s failed, output:", script)
1630 output = indent_re.sub(' ', output)
1631 feedback_fn("%s" % output)
1637 class LUVerifyDisks(NoHooksLU):
1638 """Verifies the cluster disks status.
1644 def ExpandNames(self):
1645 self.needed_locks = {
1646 locking.LEVEL_NODE: locking.ALL_SET,
1647 locking.LEVEL_INSTANCE: locking.ALL_SET,
1649 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1651 def CheckPrereq(self):
1652 """Check prerequisites.
1654 This has no prerequisites.
1659 def Exec(self, feedback_fn):
1660 """Verify integrity of cluster disks.
1662 @rtype: tuple of three items
1663 @return: a tuple of (dict of node-to-node_error, list of instances
1664 which need activate-disks, dict of instance: (node, volume) for
1668 result = res_nodes, res_instances, res_missing = {}, [], {}
1670 vg_name = self.cfg.GetVGName()
1671 nodes = utils.NiceSort(self.cfg.GetNodeList())
1672 instances = [self.cfg.GetInstanceInfo(name)
1673 for name in self.cfg.GetInstanceList()]
1676 for inst in instances:
1678 if (not inst.admin_up or
1679 inst.disk_template not in constants.DTS_NET_MIRROR):
1681 inst.MapLVsByNode(inst_lvs)
1682 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1683 for node, vol_list in inst_lvs.iteritems():
1684 for vol in vol_list:
1685 nv_dict[(node, vol)] = inst
1690 node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1694 node_res = node_lvs[node]
1695 if node_res.offline:
1697 msg = node_res.fail_msg
1699 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1700 res_nodes[node] = msg
1703 lvs = node_res.payload
1704 for lv_name, (_, _, lv_online) in lvs.items():
1705 inst = nv_dict.pop((node, lv_name), None)
1706 if (not lv_online and inst is not None
1707 and inst.name not in res_instances):
1708 res_instances.append(inst.name)
1710 # any leftover items in nv_dict are missing LVs, let's arrange the
1712 for key, inst in nv_dict.iteritems():
1713 if inst.name not in res_missing:
1714 res_missing[inst.name] = []
1715 res_missing[inst.name].append(key)
1720 class LURepairDiskSizes(NoHooksLU):
1721 """Verifies the cluster disks sizes.
1724 _OP_REQP = ["instances"]
1727 def ExpandNames(self):
1728 if not isinstance(self.op.instances, list):
1729 raise errors.OpPrereqError("Invalid argument type 'instances'",
1732 if self.op.instances:
1733 self.wanted_names = []
1734 for name in self.op.instances:
1735 full_name = _ExpandInstanceName(self.cfg, name)
1736 self.wanted_names.append(full_name)
1737 self.needed_locks = {
1738 locking.LEVEL_NODE: [],
1739 locking.LEVEL_INSTANCE: self.wanted_names,
1741 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1743 self.wanted_names = None
1744 self.needed_locks = {
1745 locking.LEVEL_NODE: locking.ALL_SET,
1746 locking.LEVEL_INSTANCE: locking.ALL_SET,
1748 self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1750 def DeclareLocks(self, level):
1751 if level == locking.LEVEL_NODE and self.wanted_names is not None:
1752 self._LockInstancesNodes(primary_only=True)
1754 def CheckPrereq(self):
1755 """Check prerequisites.
1757 This only checks the optional instance list against the existing names.
1760 if self.wanted_names is None:
1761 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1763 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1764 in self.wanted_names]
1766 def _EnsureChildSizes(self, disk):
1767 """Ensure children of the disk have the needed disk size.
1769 This is valid mainly for DRBD8 and fixes an issue where the
1770 children have smaller disk size.
1772 @param disk: an L{ganeti.objects.Disk} object
1775 if disk.dev_type == constants.LD_DRBD8:
1776 assert disk.children, "Empty children for DRBD8?"
1777 fchild = disk.children[0]
1778 mismatch = fchild.size < disk.size
1780 self.LogInfo("Child disk has size %d, parent %d, fixing",
1781 fchild.size, disk.size)
1782 fchild.size = disk.size
1784 # and we recurse on this child only, not on the metadev
1785 return self._EnsureChildSizes(fchild) or mismatch
1789 def Exec(self, feedback_fn):
1790 """Verify the size of cluster disks.
1793 # TODO: check child disks too
1794 # TODO: check differences in size between primary/secondary nodes
1796 for instance in self.wanted_instances:
1797 pnode = instance.primary_node
1798 if pnode not in per_node_disks:
1799 per_node_disks[pnode] = []
1800 for idx, disk in enumerate(instance.disks):
1801 per_node_disks[pnode].append((instance, idx, disk))
1804 for node, dskl in per_node_disks.items():
1805 newl = [v[2].Copy() for v in dskl]
1807 self.cfg.SetDiskID(dsk, node)
1808 result = self.rpc.call_blockdev_getsizes(node, newl)
1810 self.LogWarning("Failure in blockdev_getsizes call to node"
1811 " %s, ignoring", node)
1813 if len(result.data) != len(dskl):
1814 self.LogWarning("Invalid result from node %s, ignoring node results",
1817 for ((instance, idx, disk), size) in zip(dskl, result.data):
1819 self.LogWarning("Disk %d of instance %s did not return size"
1820 " information, ignoring", idx, instance.name)
1822 if not isinstance(size, (int, long)):
1823 self.LogWarning("Disk %d of instance %s did not return valid"
1824 " size information, ignoring", idx, instance.name)
1827 if size != disk.size:
1828 self.LogInfo("Disk %d of instance %s has mismatched size,"
1829 " correcting: recorded %d, actual %d", idx,
1830 instance.name, disk.size, size)
1832 self.cfg.Update(instance, feedback_fn)
1833 changed.append((instance.name, idx, size))
1834 if self._EnsureChildSizes(disk):
1835 self.cfg.Update(instance, feedback_fn)
1836 changed.append((instance.name, idx, disk.size))
1840 class LURenameCluster(LogicalUnit):
1841 """Rename the cluster.
1844 HPATH = "cluster-rename"
1845 HTYPE = constants.HTYPE_CLUSTER
1848 def BuildHooksEnv(self):
1853 "OP_TARGET": self.cfg.GetClusterName(),
1854 "NEW_NAME": self.op.name,
1856 mn = self.cfg.GetMasterNode()
1857 all_nodes = self.cfg.GetNodeList()
1858 return env, [mn], all_nodes
1860 def CheckPrereq(self):
1861 """Verify that the passed name is a valid one.
1864 hostname = utils.GetHostInfo(self.op.name)
1866 new_name = hostname.name
1867 self.ip = new_ip = hostname.ip
1868 old_name = self.cfg.GetClusterName()
1869 old_ip = self.cfg.GetMasterIP()
1870 if new_name == old_name and new_ip == old_ip:
1871 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1872 " cluster has changed",
1874 if new_ip != old_ip:
1875 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1876 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1877 " reachable on the network. Aborting." %
1878 new_ip, errors.ECODE_NOTUNIQUE)
1880 self.op.name = new_name
1882 def Exec(self, feedback_fn):
1883 """Rename the cluster.
1886 clustername = self.op.name
1889 # shutdown the master IP
1890 master = self.cfg.GetMasterNode()
1891 result = self.rpc.call_node_stop_master(master, False)
1892 result.Raise("Could not disable the master role")
1895 cluster = self.cfg.GetClusterInfo()
1896 cluster.cluster_name = clustername
1897 cluster.master_ip = ip
1898 self.cfg.Update(cluster, feedback_fn)
1900 # update the known hosts file
1901 ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1902 node_list = self.cfg.GetNodeList()
1904 node_list.remove(master)
1907 result = self.rpc.call_upload_file(node_list,
1908 constants.SSH_KNOWN_HOSTS_FILE)
1909 for to_node, to_result in result.iteritems():
1910 msg = to_result.fail_msg
1912 msg = ("Copy of file %s to node %s failed: %s" %
1913 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1914 self.proc.LogWarning(msg)
1917 result = self.rpc.call_node_start_master(master, False, False)
1918 msg = result.fail_msg
1920 self.LogWarning("Could not re-enable the master role on"
1921 " the master, please restart manually: %s", msg)
1924 def _RecursiveCheckIfLVMBased(disk):
1925 """Check if the given disk or its children are lvm-based.
1927 @type disk: L{objects.Disk}
1928 @param disk: the disk to check
1930 @return: boolean indicating whether a LD_LV dev_type was found or not
1934 for chdisk in disk.children:
1935 if _RecursiveCheckIfLVMBased(chdisk):
1937 return disk.dev_type == constants.LD_LV
1940 class LUSetClusterParams(LogicalUnit):
1941 """Change the parameters of the cluster.
1944 HPATH = "cluster-modify"
1945 HTYPE = constants.HTYPE_CLUSTER
1949 def CheckArguments(self):
1953 if not hasattr(self.op, "candidate_pool_size"):
1954 self.op.candidate_pool_size = None
1955 if self.op.candidate_pool_size is not None:
1957 self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1958 except (ValueError, TypeError), err:
1959 raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1960 str(err), errors.ECODE_INVAL)
1961 if self.op.candidate_pool_size < 1:
1962 raise errors.OpPrereqError("At least one master candidate needed",
1965 def ExpandNames(self):
1966 # FIXME: in the future maybe other cluster params won't require checking on
1967 # all nodes to be modified.
1968 self.needed_locks = {
1969 locking.LEVEL_NODE: locking.ALL_SET,
1971 self.share_locks[locking.LEVEL_NODE] = 1
1973 def BuildHooksEnv(self):
1978 "OP_TARGET": self.cfg.GetClusterName(),
1979 "NEW_VG_NAME": self.op.vg_name,
1981 mn = self.cfg.GetMasterNode()
1982 return env, [mn], [mn]
1984 def CheckPrereq(self):
1985 """Check prerequisites.
1987 This checks whether the given params don't conflict and
1988 if the given volume group is valid.
1991 if self.op.vg_name is not None and not self.op.vg_name:
1992 instances = self.cfg.GetAllInstancesInfo().values()
1993 for inst in instances:
1994 for disk in inst.disks:
1995 if _RecursiveCheckIfLVMBased(disk):
1996 raise errors.OpPrereqError("Cannot disable lvm storage while"
1997 " lvm-based instances exist",
2000 node_list = self.acquired_locks[locking.LEVEL_NODE]
2002 # if vg_name not None, checks given volume group on all nodes
2004 vglist = self.rpc.call_vg_list(node_list)
2005 for node in node_list:
2006 msg = vglist[node].fail_msg
2008 # ignoring down node
2009 self.LogWarning("Error while gathering data on node %s"
2010 " (ignoring node): %s", node, msg)
2012 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
2014 constants.MIN_VG_SIZE)
2016 raise errors.OpPrereqError("Error on node '%s': %s" %
2017 (node, vgstatus), errors.ECODE_ENVIRON)
2019 self.cluster = cluster = self.cfg.GetClusterInfo()
2020 # validate params changes
2021 if self.op.beparams:
2022 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
2023 self.new_beparams = objects.FillDict(
2024 cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
2026 if self.op.nicparams:
2027 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
2028 self.new_nicparams = objects.FillDict(
2029 cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
2030 objects.NIC.CheckParameterSyntax(self.new_nicparams)
2033 # check all instances for consistency
2034 for instance in self.cfg.GetAllInstancesInfo().values():
2035 for nic_idx, nic in enumerate(instance.nics):
2036 params_copy = copy.deepcopy(nic.nicparams)
2037 params_filled = objects.FillDict(self.new_nicparams, params_copy)
2039 # check parameter syntax
2041 objects.NIC.CheckParameterSyntax(params_filled)
2042 except errors.ConfigurationError, err:
2043 nic_errors.append("Instance %s, nic/%d: %s" %
2044 (instance.name, nic_idx, err))
2046 # if we're moving instances to routed, check that they have an ip
2047 target_mode = params_filled[constants.NIC_MODE]
2048 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
2049 nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
2050 (instance.name, nic_idx))
2052 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
2053 "\n".join(nic_errors))
2055 # hypervisor list/parameters
2056 self.new_hvparams = objects.FillDict(cluster.hvparams, {})
2057 if self.op.hvparams:
2058 if not isinstance(self.op.hvparams, dict):
2059 raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
2061 for hv_name, hv_dict in self.op.hvparams.items():
2062 if hv_name not in self.new_hvparams:
2063 self.new_hvparams[hv_name] = hv_dict
2065 self.new_hvparams[hv_name].update(hv_dict)
2067 if self.op.enabled_hypervisors is not None:
2068 self.hv_list = self.op.enabled_hypervisors
2069 if not self.hv_list:
2070 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
2071 " least one member",
2073 invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
2075 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
2077 utils.CommaJoin(invalid_hvs),
2080 self.hv_list = cluster.enabled_hypervisors
2082 if self.op.hvparams or self.op.enabled_hypervisors is not None:
2083 # either the enabled list has changed, or the parameters have, validate
2084 for hv_name, hv_params in self.new_hvparams.items():
2085 if ((self.op.hvparams and hv_name in self.op.hvparams) or
2086 (self.op.enabled_hypervisors and
2087 hv_name in self.op.enabled_hypervisors)):
2088 # either this is a new hypervisor, or its parameters have changed
2089 hv_class = hypervisor.GetHypervisor(hv_name)
2090 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2091 hv_class.CheckParameterSyntax(hv_params)
2092 _CheckHVParams(self, node_list, hv_name, hv_params)
2094 def Exec(self, feedback_fn):
2095 """Change the parameters of the cluster.
2098 if self.op.vg_name is not None:
2099 new_volume = self.op.vg_name
2102 if new_volume != self.cfg.GetVGName():
2103 self.cfg.SetVGName(new_volume)
2105 feedback_fn("Cluster LVM configuration already in desired"
2106 " state, not changing")
2107 if self.op.hvparams:
2108 self.cluster.hvparams = self.new_hvparams
2109 if self.op.enabled_hypervisors is not None:
2110 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
2111 if self.op.beparams:
2112 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
2113 if self.op.nicparams:
2114 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
2116 if self.op.candidate_pool_size is not None:
2117 self.cluster.candidate_pool_size = self.op.candidate_pool_size
2118 # we need to update the pool size here, otherwise the save will fail
2119 _AdjustCandidatePool(self, [])
2121 self.cfg.Update(self.cluster, feedback_fn)
2124 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
2125 """Distribute additional files which are part of the cluster configuration.
2127 ConfigWriter takes care of distributing the config and ssconf files, but
2128 there are more files which should be distributed to all nodes. This function
2129 makes sure those are copied.
2131 @param lu: calling logical unit
2132 @param additional_nodes: list of nodes not in the config to distribute to
2135 # 1. Gather target nodes
2136 myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
2137 dist_nodes = lu.cfg.GetNodeList()
2138 if additional_nodes is not None:
2139 dist_nodes.extend(additional_nodes)
2140 if myself.name in dist_nodes:
2141 dist_nodes.remove(myself.name)
2143 # 2. Gather files to distribute
2144 dist_files = set([constants.ETC_HOSTS,
2145 constants.SSH_KNOWN_HOSTS_FILE,
2146 constants.RAPI_CERT_FILE,
2147 constants.RAPI_USERS_FILE,
2148 constants.HMAC_CLUSTER_KEY,
2151 enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
2152 for hv_name in enabled_hypervisors:
2153 hv_class = hypervisor.GetHypervisor(hv_name)
2154 dist_files.update(hv_class.GetAncillaryFiles())
2156 # 3. Perform the files upload
2157 for fname in dist_files:
2158 if os.path.exists(fname):
2159 result = lu.rpc.call_upload_file(dist_nodes, fname)
2160 for to_node, to_result in result.items():
2161 msg = to_result.fail_msg
2163 msg = ("Copy of file %s to node %s failed: %s" %
2164 (fname, to_node, msg))
2165 lu.proc.LogWarning(msg)
2168 class LURedistributeConfig(NoHooksLU):
2169 """Force the redistribution of cluster configuration.
2171 This is a very simple LU.
2177 def ExpandNames(self):
2178 self.needed_locks = {
2179 locking.LEVEL_NODE: locking.ALL_SET,
2181 self.share_locks[locking.LEVEL_NODE] = 1
2183 def CheckPrereq(self):
2184 """Check prerequisites.
2188 def Exec(self, feedback_fn):
2189 """Redistribute the configuration.
2192 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
2193 _RedistributeAncillaryFiles(self)
2196 def _WaitForSync(lu, instance, oneshot=False):
2197 """Sleep and poll for an instance's disk to sync.
2200 if not instance.disks:
2204 lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
2206 node = instance.primary_node
2208 for dev in instance.disks:
2209 lu.cfg.SetDiskID(dev, node)
2211 # TODO: Convert to utils.Retry
2214 degr_retries = 10 # in seconds, as we sleep 1 second each time
2218 cumul_degraded = False
2219 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
2220 msg = rstats.fail_msg
2222 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
2225 raise errors.RemoteError("Can't contact node %s for mirror data,"
2226 " aborting." % node)
2229 rstats = rstats.payload
2231 for i, mstat in enumerate(rstats):
2233 lu.LogWarning("Can't compute data for node %s/%s",
2234 node, instance.disks[i].iv_name)
2237 cumul_degraded = (cumul_degraded or
2238 (mstat.is_degraded and mstat.sync_percent is None))
2239 if mstat.sync_percent is not None:
2241 if mstat.estimated_time is not None:
2242 rem_time = "%d estimated seconds remaining" % mstat.estimated_time
2243 max_time = mstat.estimated_time
2245 rem_time = "no time estimate"
2246 lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
2247 (instance.disks[i].iv_name, mstat.sync_percent,
2250 # if we're done but degraded, let's do a few small retries, to
2251 # make sure we see a stable and not transient situation; therefore
2252 # we force restart of the loop
2253 if (done or oneshot) and cumul_degraded and degr_retries > 0:
2254 logging.info("Degraded disks found, %d retries left", degr_retries)
2262 time.sleep(min(60, max_time))
2265 lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
2266 return not cumul_degraded
2269 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
2270 """Check that mirrors are not degraded.
2272 The ldisk parameter, if True, will change the test from the
2273 is_degraded attribute (which represents overall non-ok status for
2274 the device(s)) to the ldisk (representing the local storage status).
2277 lu.cfg.SetDiskID(dev, node)
2281 if on_primary or dev.AssembleOnSecondary():
2282 rstats = lu.rpc.call_blockdev_find(node, dev)
2283 msg = rstats.fail_msg
2285 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
2287 elif not rstats.payload:
2288 lu.LogWarning("Can't find disk on node %s", node)
2292 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
2294 result = result and not rstats.payload.is_degraded
2297 for child in dev.children:
2298 result = result and _CheckDiskConsistency(lu, child, node, on_primary)
2303 class LUDiagnoseOS(NoHooksLU):
2304 """Logical unit for OS diagnose/query.
2307 _OP_REQP = ["output_fields", "names"]
2309 _FIELDS_STATIC = utils.FieldSet()
2310 _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants")
2311 # Fields that need calculation of global os validity
2312 _FIELDS_NEEDVALID = frozenset(["valid", "variants"])
2314 def ExpandNames(self):
2316 raise errors.OpPrereqError("Selective OS query not supported",
2319 _CheckOutputFields(static=self._FIELDS_STATIC,
2320 dynamic=self._FIELDS_DYNAMIC,
2321 selected=self.op.output_fields)
2323 # Lock all nodes, in shared mode
2324 # Temporary removal of locks, should be reverted later
2325 # TODO: reintroduce locks when they are lighter-weight
2326 self.needed_locks = {}
2327 #self.share_locks[locking.LEVEL_NODE] = 1
2328 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2330 def CheckPrereq(self):
2331 """Check prerequisites.
2336 def _DiagnoseByOS(rlist):
2337 """Remaps a per-node return list into an a per-os per-node dictionary
2339 @param rlist: a map with node names as keys and OS objects as values
2342 @return: a dictionary with osnames as keys and as value another map, with
2343 nodes as keys and tuples of (path, status, diagnose) as values, eg::
2345 {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
2346 (/srv/..., False, "invalid api")],
2347 "node2": [(/srv/..., True, "")]}
2352 # we build here the list of nodes that didn't fail the RPC (at RPC
2353 # level), so that nodes with a non-responding node daemon don't
2354 # make all OSes invalid
2355 good_nodes = [node_name for node_name in rlist
2356 if not rlist[node_name].fail_msg]
2357 for node_name, nr in rlist.items():
2358 if nr.fail_msg or not nr.payload:
2360 for name, path, status, diagnose, variants in nr.payload:
2361 if name not in all_os:
2362 # build a list of nodes for this os containing empty lists
2363 # for each node in node_list
2365 for nname in good_nodes:
2366 all_os[name][nname] = []
2367 all_os[name][node_name].append((path, status, diagnose, variants))
2370 def Exec(self, feedback_fn):
2371 """Compute the list of OSes.
2374 valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
2375 node_data = self.rpc.call_os_diagnose(valid_nodes)
2376 pol = self._DiagnoseByOS(node_data)
2378 calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields)
2379 calc_variants = "variants" in self.op.output_fields
2381 for os_name, os_data in pol.items():
2386 for osl in os_data.values():
2387 valid = valid and osl and osl[0][1]
2392 node_variants = osl[0][3]
2393 if variants is None:
2394 variants = node_variants
2396 variants = [v for v in variants if v in node_variants]
2398 for field in self.op.output_fields:
2401 elif field == "valid":
2403 elif field == "node_status":
2404 # this is just a copy of the dict
2406 for node_name, nos_list in os_data.items():
2407 val[node_name] = nos_list
2408 elif field == "variants":
2411 raise errors.ParameterError(field)
2418 class LURemoveNode(LogicalUnit):
2419 """Logical unit for removing a node.
2422 HPATH = "node-remove"
2423 HTYPE = constants.HTYPE_NODE
2424 _OP_REQP = ["node_name"]
2426 def BuildHooksEnv(self):
2429 This doesn't run on the target node in the pre phase as a failed
2430 node would then be impossible to remove.
2434 "OP_TARGET": self.op.node_name,
2435 "NODE_NAME": self.op.node_name,
2437 all_nodes = self.cfg.GetNodeList()
2439 all_nodes.remove(self.op.node_name)
2441 logging.warning("Node %s which is about to be removed not found"
2442 " in the all nodes list", self.op.node_name)
2443 return env, all_nodes, all_nodes
2445 def CheckPrereq(self):
2446 """Check prerequisites.
2449 - the node exists in the configuration
2450 - it does not have primary or secondary instances
2451 - it's not the master
2453 Any errors are signaled by raising errors.OpPrereqError.
2456 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
2457 node = self.cfg.GetNodeInfo(self.op.node_name)
2458 assert node is not None
2460 instance_list = self.cfg.GetInstanceList()
2462 masternode = self.cfg.GetMasterNode()
2463 if node.name == masternode:
2464 raise errors.OpPrereqError("Node is the master node,"
2465 " you need to failover first.",
2468 for instance_name in instance_list:
2469 instance = self.cfg.GetInstanceInfo(instance_name)
2470 if node.name in instance.all_nodes:
2471 raise errors.OpPrereqError("Instance %s is still running on the node,"
2472 " please remove first." % instance_name,
2474 self.op.node_name = node.name
2477 def Exec(self, feedback_fn):
2478 """Removes the node from the cluster.
2482 logging.info("Stopping the node daemon and removing configs from node %s",
2485 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
2487 # Promote nodes to master candidate as needed
2488 _AdjustCandidatePool(self, exceptions=[node.name])
2489 self.context.RemoveNode(node.name)
2491 # Run post hooks on the node before it's removed
2492 hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
2494 hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
2496 # pylint: disable-msg=W0702
2497 self.LogWarning("Errors occurred running hooks on %s" % node.name)
2499 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
2500 msg = result.fail_msg
2502 self.LogWarning("Errors encountered on the remote node while leaving"
2503 " the cluster: %s", msg)
2506 class LUQueryNodes(NoHooksLU):
2507 """Logical unit for querying nodes.
2510 # pylint: disable-msg=W0142
2511 _OP_REQP = ["output_fields", "names", "use_locking"]
2514 _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
2515 "master_candidate", "offline", "drained"]
2517 _FIELDS_DYNAMIC = utils.FieldSet(
2519 "mtotal", "mnode", "mfree",
2521 "ctotal", "cnodes", "csockets",
2524 _FIELDS_STATIC = utils.FieldSet(*[
2525 "pinst_cnt", "sinst_cnt",
2526 "pinst_list", "sinst_list",
2527 "pip", "sip", "tags",
2529 "role"] + _SIMPLE_FIELDS
2532 def ExpandNames(self):
2533 _CheckOutputFields(static=self._FIELDS_STATIC,
2534 dynamic=self._FIELDS_DYNAMIC,
2535 selected=self.op.output_fields)
2537 self.needed_locks = {}
2538 self.share_locks[locking.LEVEL_NODE] = 1
2541 self.wanted = _GetWantedNodes(self, self.op.names)
2543 self.wanted = locking.ALL_SET
2545 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2546 self.do_locking = self.do_node_query and self.op.use_locking
2548 # if we don't request only static fields, we need to lock the nodes
2549 self.needed_locks[locking.LEVEL_NODE] = self.wanted
2551 def CheckPrereq(self):
2552 """Check prerequisites.
2555 # The validation of the node list is done in the _GetWantedNodes,
2556 # if non empty, and if empty, there's no validation to do
2559 def Exec(self, feedback_fn):
2560 """Computes the list of nodes and their attributes.
2563 all_info = self.cfg.GetAllNodesInfo()
2565 nodenames = self.acquired_locks[locking.LEVEL_NODE]
2566 elif self.wanted != locking.ALL_SET:
2567 nodenames = self.wanted
2568 missing = set(nodenames).difference(all_info.keys())
2570 raise errors.OpExecError(
2571 "Some nodes were removed before retrieving their data: %s" % missing)
2573 nodenames = all_info.keys()
2575 nodenames = utils.NiceSort(nodenames)
2576 nodelist = [all_info[name] for name in nodenames]
2578 # begin data gathering
2580 if self.do_node_query:
2582 node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2583 self.cfg.GetHypervisorType())
2584 for name in nodenames:
2585 nodeinfo = node_data[name]
2586 if not nodeinfo.fail_msg and nodeinfo.payload:
2587 nodeinfo = nodeinfo.payload
2588 fn = utils.TryConvert
2590 "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2591 "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2592 "mfree": fn(int, nodeinfo.get('memory_free', None)),
2593 "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2594 "dfree": fn(int, nodeinfo.get('vg_free', None)),
2595 "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2596 "bootid": nodeinfo.get('bootid', None),
2597 "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2598 "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2601 live_data[name] = {}
2603 live_data = dict.fromkeys(nodenames, {})
2605 node_to_primary = dict([(name, set()) for name in nodenames])
2606 node_to_secondary = dict([(name, set()) for name in nodenames])
2608 inst_fields = frozenset(("pinst_cnt", "pinst_list",
2609 "sinst_cnt", "sinst_list"))
2610 if inst_fields & frozenset(self.op.output_fields):
2611 inst_data = self.cfg.GetAllInstancesInfo()
2613 for inst in inst_data.values():
2614 if inst.primary_node in node_to_primary:
2615 node_to_primary[inst.primary_node].add(inst.name)
2616 for secnode in inst.secondary_nodes:
2617 if secnode in node_to_secondary:
2618 node_to_secondary[secnode].add(inst.name)
2620 master_node = self.cfg.GetMasterNode()
2622 # end data gathering
2625 for node in nodelist:
2627 for field in self.op.output_fields:
2628 if field in self._SIMPLE_FIELDS:
2629 val = getattr(node, field)
2630 elif field == "pinst_list":
2631 val = list(node_to_primary[node.name])
2632 elif field == "sinst_list":
2633 val = list(node_to_secondary[node.name])
2634 elif field == "pinst_cnt":
2635 val = len(node_to_primary[node.name])
2636 elif field == "sinst_cnt":
2637 val = len(node_to_secondary[node.name])
2638 elif field == "pip":
2639 val = node.primary_ip
2640 elif field == "sip":
2641 val = node.secondary_ip
2642 elif field == "tags":
2643 val = list(node.GetTags())
2644 elif field == "master":
2645 val = node.name == master_node
2646 elif self._FIELDS_DYNAMIC.Matches(field):
2647 val = live_data[node.name].get(field, None)
2648 elif field == "role":
2649 if node.name == master_node:
2651 elif node.master_candidate:
2660 raise errors.ParameterError(field)
2661 node_output.append(val)
2662 output.append(node_output)
2667 class LUQueryNodeVolumes(NoHooksLU):
2668 """Logical unit for getting volumes on node(s).
2671 _OP_REQP = ["nodes", "output_fields"]
2673 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2674 _FIELDS_STATIC = utils.FieldSet("node")
2676 def ExpandNames(self):
2677 _CheckOutputFields(static=self._FIELDS_STATIC,
2678 dynamic=self._FIELDS_DYNAMIC,
2679 selected=self.op.output_fields)
2681 self.needed_locks = {}
2682 self.share_locks[locking.LEVEL_NODE] = 1
2683 if not self.op.nodes:
2684 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2686 self.needed_locks[locking.LEVEL_NODE] = \
2687 _GetWantedNodes(self, self.op.nodes)
2689 def CheckPrereq(self):
2690 """Check prerequisites.
2692 This checks that the fields required are valid output fields.
2695 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2697 def Exec(self, feedback_fn):
2698 """Computes the list of nodes and their attributes.
2701 nodenames = self.nodes
2702 volumes = self.rpc.call_node_volumes(nodenames)
2704 ilist = [self.cfg.GetInstanceInfo(iname) for iname
2705 in self.cfg.GetInstanceList()]
2707 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2710 for node in nodenames:
2711 nresult = volumes[node]
2714 msg = nresult.fail_msg
2716 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2719 node_vols = nresult.payload[:]
2720 node_vols.sort(key=lambda vol: vol['dev'])
2722 for vol in node_vols:
2724 for field in self.op.output_fields:
2727 elif field == "phys":
2731 elif field == "name":
2733 elif field == "size":
2734 val = int(float(vol['size']))
2735 elif field == "instance":
2737 if node not in lv_by_node[inst]:
2739 if vol['name'] in lv_by_node[inst][node]:
2745 raise errors.ParameterError(field)
2746 node_output.append(str(val))
2748 output.append(node_output)
2753 class LUQueryNodeStorage(NoHooksLU):
2754 """Logical unit for getting information on storage units on node(s).
2757 _OP_REQP = ["nodes", "storage_type", "output_fields"]
2759 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
2761 def ExpandNames(self):
2762 storage_type = self.op.storage_type
2764 if storage_type not in constants.VALID_STORAGE_TYPES:
2765 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2768 _CheckOutputFields(static=self._FIELDS_STATIC,
2769 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
2770 selected=self.op.output_fields)
2772 self.needed_locks = {}
2773 self.share_locks[locking.LEVEL_NODE] = 1
2776 self.needed_locks[locking.LEVEL_NODE] = \
2777 _GetWantedNodes(self, self.op.nodes)
2779 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2781 def CheckPrereq(self):
2782 """Check prerequisites.
2784 This checks that the fields required are valid output fields.
2787 self.op.name = getattr(self.op, "name", None)
2789 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2791 def Exec(self, feedback_fn):
2792 """Computes the list of nodes and their attributes.
2795 # Always get name to sort by
2796 if constants.SF_NAME in self.op.output_fields:
2797 fields = self.op.output_fields[:]
2799 fields = [constants.SF_NAME] + self.op.output_fields
2801 # Never ask for node or type as it's only known to the LU
2802 for extra in [constants.SF_NODE, constants.SF_TYPE]:
2803 while extra in fields:
2804 fields.remove(extra)
2806 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2807 name_idx = field_idx[constants.SF_NAME]
2809 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2810 data = self.rpc.call_storage_list(self.nodes,
2811 self.op.storage_type, st_args,
2812 self.op.name, fields)
2816 for node in utils.NiceSort(self.nodes):
2817 nresult = data[node]
2821 msg = nresult.fail_msg
2823 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2826 rows = dict([(row[name_idx], row) for row in nresult.payload])
2828 for name in utils.NiceSort(rows.keys()):
2833 for field in self.op.output_fields:
2834 if field == constants.SF_NODE:
2836 elif field == constants.SF_TYPE:
2837 val = self.op.storage_type
2838 elif field in field_idx:
2839 val = row[field_idx[field]]
2841 raise errors.ParameterError(field)
2850 class LUModifyNodeStorage(NoHooksLU):
2851 """Logical unit for modifying a storage volume on a node.
2854 _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2857 def CheckArguments(self):
2858 self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
2860 storage_type = self.op.storage_type
2861 if storage_type not in constants.VALID_STORAGE_TYPES:
2862 raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
2865 def ExpandNames(self):
2866 self.needed_locks = {
2867 locking.LEVEL_NODE: self.op.node_name,
2870 def CheckPrereq(self):
2871 """Check prerequisites.
2874 storage_type = self.op.storage_type
2877 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2879 raise errors.OpPrereqError("Storage units of type '%s' can not be"
2880 " modified" % storage_type,
2883 diff = set(self.op.changes.keys()) - modifiable
2885 raise errors.OpPrereqError("The following fields can not be modified for"
2886 " storage units of type '%s': %r" %
2887 (storage_type, list(diff)),
2890 def Exec(self, feedback_fn):
2891 """Computes the list of nodes and their attributes.
2894 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2895 result = self.rpc.call_storage_modify(self.op.node_name,
2896 self.op.storage_type, st_args,
2897 self.op.name, self.op.changes)
2898 result.Raise("Failed to modify storage unit '%s' on %s" %
2899 (self.op.name, self.op.node_name))
2902 class LUAddNode(LogicalUnit):
2903 """Logical unit for adding node to the cluster.
2907 HTYPE = constants.HTYPE_NODE
2908 _OP_REQP = ["node_name"]
2910 def BuildHooksEnv(self):
2913 This will run on all nodes before, and on all nodes + the new node after.
2917 "OP_TARGET": self.op.node_name,
2918 "NODE_NAME": self.op.node_name,
2919 "NODE_PIP": self.op.primary_ip,
2920 "NODE_SIP": self.op.secondary_ip,
2922 nodes_0 = self.cfg.GetNodeList()
2923 nodes_1 = nodes_0 + [self.op.node_name, ]
2924 return env, nodes_0, nodes_1
2926 def CheckPrereq(self):
2927 """Check prerequisites.
2930 - the new node is not already in the config
2932 - its parameters (single/dual homed) matches the cluster
2934 Any errors are signaled by raising errors.OpPrereqError.
2937 node_name = self.op.node_name
2940 dns_data = utils.GetHostInfo(node_name)
2942 node = dns_data.name
2943 primary_ip = self.op.primary_ip = dns_data.ip
2944 secondary_ip = getattr(self.op, "secondary_ip", None)
2945 if secondary_ip is None:
2946 secondary_ip = primary_ip
2947 if not utils.IsValidIP(secondary_ip):
2948 raise errors.OpPrereqError("Invalid secondary IP given",
2950 self.op.secondary_ip = secondary_ip
2952 node_list = cfg.GetNodeList()
2953 if not self.op.readd and node in node_list:
2954 raise errors.OpPrereqError("Node %s is already in the configuration" %
2955 node, errors.ECODE_EXISTS)
2956 elif self.op.readd and node not in node_list:
2957 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
2960 for existing_node_name in node_list:
2961 existing_node = cfg.GetNodeInfo(existing_node_name)
2963 if self.op.readd and node == existing_node_name:
2964 if (existing_node.primary_ip != primary_ip or
2965 existing_node.secondary_ip != secondary_ip):
2966 raise errors.OpPrereqError("Readded node doesn't have the same IP"
2967 " address configuration as before",
2971 if (existing_node.primary_ip == primary_ip or
2972 existing_node.secondary_ip == primary_ip or
2973 existing_node.primary_ip == secondary_ip or
2974 existing_node.secondary_ip == secondary_ip):
2975 raise errors.OpPrereqError("New node ip address(es) conflict with"
2976 " existing node %s" % existing_node.name,
2977 errors.ECODE_NOTUNIQUE)
2979 # check that the type of the node (single versus dual homed) is the
2980 # same as for the master
2981 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2982 master_singlehomed = myself.secondary_ip == myself.primary_ip
2983 newbie_singlehomed = secondary_ip == primary_ip
2984 if master_singlehomed != newbie_singlehomed:
2985 if master_singlehomed:
2986 raise errors.OpPrereqError("The master has no private ip but the"
2987 " new node has one",
2990 raise errors.OpPrereqError("The master has a private ip but the"
2991 " new node doesn't have one",
2994 # checks reachability
2995 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2996 raise errors.OpPrereqError("Node not reachable by ping",
2997 errors.ECODE_ENVIRON)
2999 if not newbie_singlehomed:
3000 # check reachability from my secondary ip to newbie's secondary ip
3001 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
3002 source=myself.secondary_ip):
3003 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
3004 " based ping to noded port",
3005 errors.ECODE_ENVIRON)
3012 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
3015 self.new_node = self.cfg.GetNodeInfo(node)
3016 assert self.new_node is not None, "Can't retrieve locked node %s" % node
3018 self.new_node = objects.Node(name=node,
3019 primary_ip=primary_ip,
3020 secondary_ip=secondary_ip,
3021 master_candidate=self.master_candidate,
3022 offline=False, drained=False)
3024 def Exec(self, feedback_fn):
3025 """Adds the new node to the cluster.
3028 new_node = self.new_node
3029 node = new_node.name
3031 # for re-adds, reset the offline/drained/master-candidate flags;
3032 # we need to reset here, otherwise offline would prevent RPC calls
3033 # later in the procedure; this also means that if the re-add
3034 # fails, we are left with a non-offlined, broken node
3036 new_node.drained = new_node.offline = False # pylint: disable-msg=W0201
3037 self.LogInfo("Readding a node, the offline/drained flags were reset")
3038 # if we demote the node, we do cleanup later in the procedure
3039 new_node.master_candidate = self.master_candidate
3041 # notify the user about any possible mc promotion
3042 if new_node.master_candidate:
3043 self.LogInfo("Node will be a master candidate")
3045 # check connectivity
3046 result = self.rpc.call_version([node])[node]
3047 result.Raise("Can't get version information from node %s" % node)
3048 if constants.PROTOCOL_VERSION == result.payload:
3049 logging.info("Communication to node %s fine, sw version %s match",
3050 node, result.payload)
3052 raise errors.OpExecError("Version mismatch master version %s,"
3053 " node version %s" %
3054 (constants.PROTOCOL_VERSION, result.payload))
3057 if self.cfg.GetClusterInfo().modify_ssh_setup:
3058 logging.info("Copy ssh key to node %s", node)
3059 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
3061 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
3062 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
3066 keyarray.append(utils.ReadFile(i))
3068 result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
3069 keyarray[2], keyarray[3], keyarray[4],
3071 result.Raise("Cannot transfer ssh keys to the new node")
3073 # Add node to our /etc/hosts, and add key to known_hosts
3074 if self.cfg.GetClusterInfo().modify_etc_hosts:
3075 utils.AddHostToEtcHosts(new_node.name)
3077 if new_node.secondary_ip != new_node.primary_ip:
3078 result = self.rpc.call_node_has_ip_address(new_node.name,
3079 new_node.secondary_ip)
3080 result.Raise("Failure checking secondary ip on node %s" % new_node.name,
3081 prereq=True, ecode=errors.ECODE_ENVIRON)
3082 if not result.payload:
3083 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
3084 " you gave (%s). Please fix and re-run this"
3085 " command." % new_node.secondary_ip)
3087 node_verify_list = [self.cfg.GetMasterNode()]
3088 node_verify_param = {
3089 constants.NV_NODELIST: [node],
3090 # TODO: do a node-net-test as well?
3093 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
3094 self.cfg.GetClusterName())
3095 for verifier in node_verify_list:
3096 result[verifier].Raise("Cannot communicate with node %s" % verifier)
3097 nl_payload = result[verifier].payload[constants.NV_NODELIST]
3099 for failed in nl_payload:
3100 feedback_fn("ssh/hostname verification failed"
3101 " (checking from %s): %s" %
3102 (verifier, nl_payload[failed]))
3103 raise errors.OpExecError("ssh/hostname verification failed.")
3106 _RedistributeAncillaryFiles(self)
3107 self.context.ReaddNode(new_node)
3108 # make sure we redistribute the config
3109 self.cfg.Update(new_node, feedback_fn)
3110 # and make sure the new node will not have old files around
3111 if not new_node.master_candidate:
3112 result = self.rpc.call_node_demote_from_mc(new_node.name)
3113 msg = result.fail_msg
3115 self.LogWarning("Node failed to demote itself from master"
3116 " candidate status: %s" % msg)
3118 _RedistributeAncillaryFiles(self, additional_nodes=[node])
3119 self.context.AddNode(new_node, self.proc.GetECId())
3122 class LUSetNodeParams(LogicalUnit):
3123 """Modifies the parameters of a node.
3126 HPATH = "node-modify"
3127 HTYPE = constants.HTYPE_NODE
3128 _OP_REQP = ["node_name"]
3131 def CheckArguments(self):
3132 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3133 _CheckBooleanOpField(self.op, 'master_candidate')
3134 _CheckBooleanOpField(self.op, 'offline')
3135 _CheckBooleanOpField(self.op, 'drained')
3136 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
3137 if all_mods.count(None) == 3:
3138 raise errors.OpPrereqError("Please pass at least one modification",
3140 if all_mods.count(True) > 1:
3141 raise errors.OpPrereqError("Can't set the node into more than one"
3142 " state at the same time",
3145 def ExpandNames(self):
3146 self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
3148 def BuildHooksEnv(self):
3151 This runs on the master node.
3155 "OP_TARGET": self.op.node_name,
3156 "MASTER_CANDIDATE": str(self.op.master_candidate),
3157 "OFFLINE": str(self.op.offline),
3158 "DRAINED": str(self.op.drained),
3160 nl = [self.cfg.GetMasterNode(),
3164 def CheckPrereq(self):
3165 """Check prerequisites.
3167 This only checks the instance list against the existing names.
3170 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
3172 if (self.op.master_candidate is not None or
3173 self.op.drained is not None or
3174 self.op.offline is not None):
3175 # we can't change the master's node flags
3176 if self.op.node_name == self.cfg.GetMasterNode():
3177 raise errors.OpPrereqError("The master role can be changed"
3178 " only via masterfailover",
3181 # Boolean value that tells us whether we're offlining or draining the node
3182 offline_or_drain = self.op.offline == True or self.op.drained == True
3183 deoffline_or_drain = self.op.offline == False or self.op.drained == False
3185 if (node.master_candidate and
3186 (self.op.master_candidate == False or offline_or_drain)):
3187 cp_size = self.cfg.GetClusterInfo().candidate_pool_size
3188 mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
3189 if mc_now <= cp_size:
3190 msg = ("Not enough master candidates (desired"
3191 " %d, new value will be %d)" % (cp_size, mc_now-1))
3192 # Only allow forcing the operation if it's an offline/drain operation,
3193 # and we could not possibly promote more nodes.
3194 # FIXME: this can still lead to issues if in any way another node which
3195 # could be promoted appears in the meantime.
3196 if self.op.force and offline_or_drain and mc_should == mc_max:
3197 self.LogWarning(msg)
3199 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
3201 if (self.op.master_candidate == True and
3202 ((node.offline and not self.op.offline == False) or
3203 (node.drained and not self.op.drained == False))):
3204 raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
3205 " to master_candidate" % node.name,
3208 # If we're being deofflined/drained, we'll MC ourself if needed
3209 if (deoffline_or_drain and not offline_or_drain and not
3210 self.op.master_candidate == True and not node.master_candidate):
3211 self.op.master_candidate = _DecideSelfPromotion(self)
3212 if self.op.master_candidate:
3213 self.LogInfo("Autopromoting node to master candidate")
3217 def Exec(self, feedback_fn):
3226 if self.op.offline is not None:
3227 node.offline = self.op.offline
3228 result.append(("offline", str(self.op.offline)))
3229 if self.op.offline == True:
3230 if node.master_candidate:
3231 node.master_candidate = False
3233 result.append(("master_candidate", "auto-demotion due to offline"))
3235 node.drained = False
3236 result.append(("drained", "clear drained status due to offline"))
3238 if self.op.master_candidate is not None:
3239 node.master_candidate = self.op.master_candidate
3241 result.append(("master_candidate", str(self.op.master_candidate)))
3242 if self.op.master_candidate == False:
3243 rrc = self.rpc.call_node_demote_from_mc(node.name)
3246 self.LogWarning("Node failed to demote itself: %s" % msg)
3248 if self.op.drained is not None:
3249 node.drained = self.op.drained
3250 result.append(("drained", str(self.op.drained)))
3251 if self.op.drained == True:
3252 if node.master_candidate:
3253 node.master_candidate = False
3255 result.append(("master_candidate", "auto-demotion due to drain"))
3256 rrc = self.rpc.call_node_demote_from_mc(node.name)
3259 self.LogWarning("Node failed to demote itself: %s" % msg)
3261 node.offline = False
3262 result.append(("offline", "clear offline status due to drain"))
3264 # this will trigger configuration file update, if needed
3265 self.cfg.Update(node, feedback_fn)
3266 # this will trigger job queue propagation or cleanup
3268 self.context.ReaddNode(node)
3273 class LUPowercycleNode(NoHooksLU):
3274 """Powercycles a node.
3277 _OP_REQP = ["node_name", "force"]
3280 def CheckArguments(self):
3281 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
3282 if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
3283 raise errors.OpPrereqError("The node is the master and the force"
3284 " parameter was not set",
3287 def ExpandNames(self):
3288 """Locking for PowercycleNode.
3290 This is a last-resort option and shouldn't block on other
3291 jobs. Therefore, we grab no locks.
3294 self.needed_locks = {}
3296 def CheckPrereq(self):
3297 """Check prerequisites.
3299 This LU has no prereqs.
3304 def Exec(self, feedback_fn):
3308 result = self.rpc.call_node_powercycle(self.op.node_name,
3309 self.cfg.GetHypervisorType())
3310 result.Raise("Failed to schedule the reboot")
3311 return result.payload
3314 class LUQueryClusterInfo(NoHooksLU):
3315 """Query cluster configuration.
3321 def ExpandNames(self):
3322 self.needed_locks = {}
3324 def CheckPrereq(self):
3325 """No prerequsites needed for this LU.
3330 def Exec(self, feedback_fn):
3331 """Return cluster config.
3334 cluster = self.cfg.GetClusterInfo()
3336 "software_version": constants.RELEASE_VERSION,
3337 "protocol_version": constants.PROTOCOL_VERSION,
3338 "config_version": constants.CONFIG_VERSION,
3339 "os_api_version": max(constants.OS_API_VERSIONS),
3340 "export_version": constants.EXPORT_VERSION,
3341 "architecture": (platform.architecture()[0], platform.machine()),
3342 "name": cluster.cluster_name,
3343 "master": cluster.master_node,
3344 "default_hypervisor": cluster.enabled_hypervisors[0],
3345 "enabled_hypervisors": cluster.enabled_hypervisors,
3346 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
3347 for hypervisor_name in cluster.enabled_hypervisors]),
3348 "beparams": cluster.beparams,
3349 "nicparams": cluster.nicparams,
3350 "candidate_pool_size": cluster.candidate_pool_size,
3351 "master_netdev": cluster.master_netdev,
3352 "volume_group_name": cluster.volume_group_name,
3353 "file_storage_dir": cluster.file_storage_dir,
3354 "ctime": cluster.ctime,
3355 "mtime": cluster.mtime,
3356 "uuid": cluster.uuid,
3357 "tags": list(cluster.GetTags()),
3363 class LUQueryConfigValues(NoHooksLU):
3364 """Return configuration values.
3369 _FIELDS_DYNAMIC = utils.FieldSet()
3370 _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
3373 def ExpandNames(self):
3374 self.needed_locks = {}
3376 _CheckOutputFields(static=self._FIELDS_STATIC,
3377 dynamic=self._FIELDS_DYNAMIC,
3378 selected=self.op.output_fields)
3380 def CheckPrereq(self):
3381 """No prerequisites.
3386 def Exec(self, feedback_fn):
3387 """Dump a representation of the cluster config to the standard output.
3391 for field in self.op.output_fields:
3392 if field == "cluster_name":
3393 entry = self.cfg.GetClusterName()
3394 elif field == "master_node":
3395 entry = self.cfg.GetMasterNode()
3396 elif field == "drain_flag":
3397 entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
3398 elif field == "watcher_pause":
3399 return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
3401 raise errors.ParameterError(field)
3402 values.append(entry)
3406 class LUActivateInstanceDisks(NoHooksLU):
3407 """Bring up an instance's disks.
3410 _OP_REQP = ["instance_name"]
3413 def ExpandNames(self):
3414 self._ExpandAndLockInstance()
3415 self.needed_locks[locking.LEVEL_NODE] = []
3416 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3418 def DeclareLocks(self, level):
3419 if level == locking.LEVEL_NODE:
3420 self._LockInstancesNodes()
3422 def CheckPrereq(self):
3423 """Check prerequisites.
3425 This checks that the instance is in the cluster.
3428 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3429 assert self.instance is not None, \
3430 "Cannot retrieve locked instance %s" % self.op.instance_name
3431 _CheckNodeOnline(self, self.instance.primary_node)
3432 if not hasattr(self.op, "ignore_size"):
3433 self.op.ignore_size = False
3435 def Exec(self, feedback_fn):
3436 """Activate the disks.
3439 disks_ok, disks_info = \
3440 _AssembleInstanceDisks(self, self.instance,
3441 ignore_size=self.op.ignore_size)
3443 raise errors.OpExecError("Cannot activate block devices")
3448 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
3450 """Prepare the block devices for an instance.
3452 This sets up the block devices on all nodes.
3454 @type lu: L{LogicalUnit}
3455 @param lu: the logical unit on whose behalf we execute
3456 @type instance: L{objects.Instance}
3457 @param instance: the instance for whose disks we assemble
3458 @type ignore_secondaries: boolean
3459 @param ignore_secondaries: if true, errors on secondary nodes
3460 won't result in an error return from the function
3461 @type ignore_size: boolean
3462 @param ignore_size: if true, the current known size of the disk
3463 will not be used during the disk activation, useful for cases
3464 when the size is wrong
3465 @return: False if the operation failed, otherwise a list of
3466 (host, instance_visible_name, node_visible_name)
3467 with the mapping from node devices to instance devices
3472 iname = instance.name
3473 # With the two passes mechanism we try to reduce the window of
3474 # opportunity for the race condition of switching DRBD to primary
3475 # before handshaking occured, but we do not eliminate it
3477 # The proper fix would be to wait (with some limits) until the
3478 # connection has been made and drbd transitions from WFConnection
3479 # into any other network-connected state (Connected, SyncTarget,
3482 # 1st pass, assemble on all nodes in secondary mode
3483 for inst_disk in instance.disks:
3484 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3486 node_disk = node_disk.Copy()
3487 node_disk.UnsetSize()
3488 lu.cfg.SetDiskID(node_disk, node)
3489 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3490 msg = result.fail_msg
3492 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3493 " (is_primary=False, pass=1): %s",
3494 inst_disk.iv_name, node, msg)
3495 if not ignore_secondaries:
3498 # FIXME: race condition on drbd migration to primary
3500 # 2nd pass, do only the primary node
3501 for inst_disk in instance.disks:
3504 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3505 if node != instance.primary_node:
3508 node_disk = node_disk.Copy()
3509 node_disk.UnsetSize()
3510 lu.cfg.SetDiskID(node_disk, node)
3511 result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3512 msg = result.fail_msg
3514 lu.proc.LogWarning("Could not prepare block device %s on node %s"
3515 " (is_primary=True, pass=2): %s",
3516 inst_disk.iv_name, node, msg)
3519 dev_path = result.payload
3521 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
3523 # leave the disks configured for the primary node
3524 # this is a workaround that would be fixed better by
3525 # improving the logical/physical id handling
3526 for disk in instance.disks:
3527 lu.cfg.SetDiskID(disk, instance.primary_node)
3529 return disks_ok, device_info
3532 def _StartInstanceDisks(lu, instance, force):
3533 """Start the disks of an instance.
3536 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3537 ignore_secondaries=force)
3539 _ShutdownInstanceDisks(lu, instance)
3540 if force is not None and not force:
3541 lu.proc.LogWarning("", hint="If the message above refers to a"
3543 " you can retry the operation using '--force'.")
3544 raise errors.OpExecError("Disk consistency error")
3547 class LUDeactivateInstanceDisks(NoHooksLU):
3548 """Shutdown an instance's disks.
3551 _OP_REQP = ["instance_name"]
3554 def ExpandNames(self):
3555 self._ExpandAndLockInstance()
3556 self.needed_locks[locking.LEVEL_NODE] = []
3557 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3559 def DeclareLocks(self, level):
3560 if level == locking.LEVEL_NODE:
3561 self._LockInstancesNodes()
3563 def CheckPrereq(self):
3564 """Check prerequisites.
3566 This checks that the instance is in the cluster.
3569 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3570 assert self.instance is not None, \
3571 "Cannot retrieve locked instance %s" % self.op.instance_name
3573 def Exec(self, feedback_fn):
3574 """Deactivate the disks
3577 instance = self.instance
3578 _SafeShutdownInstanceDisks(self, instance)
3581 def _SafeShutdownInstanceDisks(lu, instance):
3582 """Shutdown block devices of an instance.
3584 This function checks if an instance is running, before calling
3585 _ShutdownInstanceDisks.
3588 pnode = instance.primary_node
3589 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3590 ins_l.Raise("Can't contact node %s" % pnode)
3592 if instance.name in ins_l.payload:
3593 raise errors.OpExecError("Instance is running, can't shutdown"
3596 _ShutdownInstanceDisks(lu, instance)
3599 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3600 """Shutdown block devices of an instance.
3602 This does the shutdown on all nodes of the instance.
3604 If the ignore_primary is false, errors on the primary node are
3609 for disk in instance.disks:
3610 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3611 lu.cfg.SetDiskID(top_disk, node)
3612 result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3613 msg = result.fail_msg
3615 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3616 disk.iv_name, node, msg)
3617 if not ignore_primary or node != instance.primary_node:
3622 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3623 """Checks if a node has enough free memory.
3625 This function check if a given node has the needed amount of free
3626 memory. In case the node has less memory or we cannot get the
3627 information from the node, this function raise an OpPrereqError
3630 @type lu: C{LogicalUnit}
3631 @param lu: a logical unit from which we get configuration data
3633 @param node: the node to check
3634 @type reason: C{str}
3635 @param reason: string to use in the error message
3636 @type requested: C{int}
3637 @param requested: the amount of memory in MiB to check for
3638 @type hypervisor_name: C{str}
3639 @param hypervisor_name: the hypervisor to ask for memory stats
3640 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3641 we cannot check the node
3644 nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3645 nodeinfo[node].Raise("Can't get data from node %s" % node,
3646 prereq=True, ecode=errors.ECODE_ENVIRON)
3647 free_mem = nodeinfo[node].payload.get('memory_free', None)
3648 if not isinstance(free_mem, int):
3649 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3650 " was '%s'" % (node, free_mem),
3651 errors.ECODE_ENVIRON)
3652 if requested > free_mem:
3653 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3654 " needed %s MiB, available %s MiB" %
3655 (node, reason, requested, free_mem),
3659 class LUStartupInstance(LogicalUnit):
3660 """Starts an instance.
3663 HPATH = "instance-start"
3664 HTYPE = constants.HTYPE_INSTANCE
3665 _OP_REQP = ["instance_name", "force"]
3668 def ExpandNames(self):
3669 self._ExpandAndLockInstance()
3671 def BuildHooksEnv(self):
3674 This runs on master, primary and secondary nodes of the instance.
3678 "FORCE": self.op.force,
3680 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3681 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3684 def CheckPrereq(self):
3685 """Check prerequisites.
3687 This checks that the instance is in the cluster.
3690 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3691 assert self.instance is not None, \
3692 "Cannot retrieve locked instance %s" % self.op.instance_name
3695 self.beparams = getattr(self.op, "beparams", {})
3697 if not isinstance(self.beparams, dict):
3698 raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3699 " dict" % (type(self.beparams), ),
3701 # fill the beparams dict
3702 utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3703 self.op.beparams = self.beparams
3706 self.hvparams = getattr(self.op, "hvparams", {})
3708 if not isinstance(self.hvparams, dict):
3709 raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3710 " dict" % (type(self.hvparams), ),
3713 # check hypervisor parameter syntax (locally)
3714 cluster = self.cfg.GetClusterInfo()
3715 utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3716 filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3718 filled_hvp.update(self.hvparams)
3719 hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3720 hv_type.CheckParameterSyntax(filled_hvp)
3721 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3722 self.op.hvparams = self.hvparams
3724 _CheckNodeOnline(self, instance.primary_node)
3726 bep = self.cfg.GetClusterInfo().FillBE(instance)
3727 # check bridges existence
3728 _CheckInstanceBridgesExist(self, instance)
3730 remote_info = self.rpc.call_instance_info(instance.primary_node,
3732 instance.hypervisor)
3733 remote_info.Raise("Error checking node %s" % instance.primary_node,
3734 prereq=True, ecode=errors.ECODE_ENVIRON)
3735 if not remote_info.payload: # not running already
3736 _CheckNodeFreeMemory(self, instance.primary_node,
3737 "starting instance %s" % instance.name,
3738 bep[constants.BE_MEMORY], instance.hypervisor)
3740 def Exec(self, feedback_fn):
3741 """Start the instance.
3744 instance = self.instance
3745 force = self.op.force
3747 self.cfg.MarkInstanceUp(instance.name)
3749 node_current = instance.primary_node
3751 _StartInstanceDisks(self, instance, force)
3753 result = self.rpc.call_instance_start(node_current, instance,
3754 self.hvparams, self.beparams)
3755 msg = result.fail_msg
3757 _ShutdownInstanceDisks(self, instance)
3758 raise errors.OpExecError("Could not start instance: %s" % msg)
3761 class LURebootInstance(LogicalUnit):
3762 """Reboot an instance.
3765 HPATH = "instance-reboot"
3766 HTYPE = constants.HTYPE_INSTANCE
3767 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3770 def CheckArguments(self):
3771 """Check the arguments.
3774 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
3775 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3777 def ExpandNames(self):
3778 if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3779 constants.INSTANCE_REBOOT_HARD,
3780 constants.INSTANCE_REBOOT_FULL]:
3781 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3782 (constants.INSTANCE_REBOOT_SOFT,
3783 constants.INSTANCE_REBOOT_HARD,
3784 constants.INSTANCE_REBOOT_FULL))
3785 self._ExpandAndLockInstance()
3787 def BuildHooksEnv(self):
3790 This runs on master, primary and secondary nodes of the instance.
3794 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3795 "REBOOT_TYPE": self.op.reboot_type,
3796 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
3798 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3799 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3802 def CheckPrereq(self):
3803 """Check prerequisites.
3805 This checks that the instance is in the cluster.
3808 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3809 assert self.instance is not None, \
3810 "Cannot retrieve locked instance %s" % self.op.instance_name
3812 _CheckNodeOnline(self, instance.primary_node)
3814 # check bridges existence
3815 _CheckInstanceBridgesExist(self, instance)
3817 def Exec(self, feedback_fn):
3818 """Reboot the instance.
3821 instance = self.instance
3822 ignore_secondaries = self.op.ignore_secondaries
3823 reboot_type = self.op.reboot_type
3825 node_current = instance.primary_node
3827 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3828 constants.INSTANCE_REBOOT_HARD]:
3829 for disk in instance.disks:
3830 self.cfg.SetDiskID(disk, node_current)
3831 result = self.rpc.call_instance_reboot(node_current, instance,
3833 self.shutdown_timeout)
3834 result.Raise("Could not reboot instance")
3836 result = self.rpc.call_instance_shutdown(node_current, instance,
3837 self.shutdown_timeout)
3838 result.Raise("Could not shutdown instance for full reboot")
3839 _ShutdownInstanceDisks(self, instance)
3840 _StartInstanceDisks(self, instance, ignore_secondaries)
3841 result = self.rpc.call_instance_start(node_current, instance, None, None)
3842 msg = result.fail_msg
3844 _ShutdownInstanceDisks(self, instance)
3845 raise errors.OpExecError("Could not start instance for"
3846 " full reboot: %s" % msg)
3848 self.cfg.MarkInstanceUp(instance.name)
3851 class LUShutdownInstance(LogicalUnit):
3852 """Shutdown an instance.
3855 HPATH = "instance-stop"
3856 HTYPE = constants.HTYPE_INSTANCE
3857 _OP_REQP = ["instance_name"]
3860 def CheckArguments(self):
3861 """Check the arguments.
3864 self.timeout = getattr(self.op, "timeout",
3865 constants.DEFAULT_SHUTDOWN_TIMEOUT)
3867 def ExpandNames(self):
3868 self._ExpandAndLockInstance()
3870 def BuildHooksEnv(self):
3873 This runs on master, primary and secondary nodes of the instance.
3876 env = _BuildInstanceHookEnvByObject(self, self.instance)
3877 env["TIMEOUT"] = self.timeout
3878 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3881 def CheckPrereq(self):
3882 """Check prerequisites.
3884 This checks that the instance is in the cluster.
3887 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3888 assert self.instance is not None, \
3889 "Cannot retrieve locked instance %s" % self.op.instance_name
3890 _CheckNodeOnline(self, self.instance.primary_node)
3892 def Exec(self, feedback_fn):
3893 """Shutdown the instance.
3896 instance = self.instance
3897 node_current = instance.primary_node
3898 timeout = self.timeout
3899 self.cfg.MarkInstanceDown(instance.name)
3900 result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
3901 msg = result.fail_msg
3903 self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3905 _ShutdownInstanceDisks(self, instance)
3908 class LUReinstallInstance(LogicalUnit):
3909 """Reinstall an instance.
3912 HPATH = "instance-reinstall"
3913 HTYPE = constants.HTYPE_INSTANCE
3914 _OP_REQP = ["instance_name"]
3917 def ExpandNames(self):
3918 self._ExpandAndLockInstance()
3920 def BuildHooksEnv(self):
3923 This runs on master, primary and secondary nodes of the instance.
3926 env = _BuildInstanceHookEnvByObject(self, self.instance)
3927 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3930 def CheckPrereq(self):
3931 """Check prerequisites.
3933 This checks that the instance is in the cluster and is not running.
3936 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3937 assert instance is not None, \
3938 "Cannot retrieve locked instance %s" % self.op.instance_name
3939 _CheckNodeOnline(self, instance.primary_node)
3941 if instance.disk_template == constants.DT_DISKLESS:
3942 raise errors.OpPrereqError("Instance '%s' has no disks" %
3943 self.op.instance_name,
3945 if instance.admin_up:
3946 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3947 self.op.instance_name,
3949 remote_info = self.rpc.call_instance_info(instance.primary_node,
3951 instance.hypervisor)
3952 remote_info.Raise("Error checking node %s" % instance.primary_node,
3953 prereq=True, ecode=errors.ECODE_ENVIRON)
3954 if remote_info.payload:
3955 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3956 (self.op.instance_name,
3957 instance.primary_node),
3960 self.op.os_type = getattr(self.op, "os_type", None)
3961 self.op.force_variant = getattr(self.op, "force_variant", False)
3962 if self.op.os_type is not None:
3964 pnode = _ExpandNodeName(self.cfg, instance.primary_node)
3965 result = self.rpc.call_os_get(pnode, self.op.os_type)
3966 result.Raise("OS '%s' not in supported OS list for primary node %s" %
3967 (self.op.os_type, pnode),
3968 prereq=True, ecode=errors.ECODE_INVAL)
3969 if not self.op.force_variant:
3970 _CheckOSVariant(result.payload, self.op.os_type)
3972 self.instance = instance
3974 def Exec(self, feedback_fn):
3975 """Reinstall the instance.
3978 inst = self.instance
3980 if self.op.os_type is not None:
3981 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3982 inst.os = self.op.os_type
3983 self.cfg.Update(inst, feedback_fn)
3985 _StartInstanceDisks(self, inst, None)
3987 feedback_fn("Running the instance OS create scripts...")
3988 # FIXME: pass debug option from opcode to backend
3989 result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
3990 self.op.debug_level)
3991 result.Raise("Could not install OS for instance %s on node %s" %
3992 (inst.name, inst.primary_node))
3994 _ShutdownInstanceDisks(self, inst)
3997 class LURecreateInstanceDisks(LogicalUnit):
3998 """Recreate an instance's missing disks.
4001 HPATH = "instance-recreate-disks"
4002 HTYPE = constants.HTYPE_INSTANCE
4003 _OP_REQP = ["instance_name", "disks"]
4006 def CheckArguments(self):
4007 """Check the arguments.
4010 if not isinstance(self.op.disks, list):
4011 raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL)
4012 for item in self.op.disks:
4013 if (not isinstance(item, int) or
4015 raise errors.OpPrereqError("Invalid disk specification '%s'" %
4016 str(item), errors.ECODE_INVAL)
4018 def ExpandNames(self):
4019 self._ExpandAndLockInstance()
4021 def BuildHooksEnv(self):
4024 This runs on master, primary and secondary nodes of the instance.
4027 env = _BuildInstanceHookEnvByObject(self, self.instance)
4028 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4031 def CheckPrereq(self):
4032 """Check prerequisites.
4034 This checks that the instance is in the cluster and is not running.
4037 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4038 assert instance is not None, \
4039 "Cannot retrieve locked instance %s" % self.op.instance_name
4040 _CheckNodeOnline(self, instance.primary_node)
4042 if instance.disk_template == constants.DT_DISKLESS:
4043 raise errors.OpPrereqError("Instance '%s' has no disks" %
4044 self.op.instance_name, errors.ECODE_INVAL)
4045 if instance.admin_up:
4046 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4047 self.op.instance_name, errors.ECODE_STATE)
4048 remote_info = self.rpc.call_instance_info(instance.primary_node,
4050 instance.hypervisor)
4051 remote_info.Raise("Error checking node %s" % instance.primary_node,
4052 prereq=True, ecode=errors.ECODE_ENVIRON)
4053 if remote_info.payload:
4054 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4055 (self.op.instance_name,
4056 instance.primary_node), errors.ECODE_STATE)
4058 if not self.op.disks:
4059 self.op.disks = range(len(instance.disks))
4061 for idx in self.op.disks:
4062 if idx >= len(instance.disks):
4063 raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
4066 self.instance = instance
4068 def Exec(self, feedback_fn):
4069 """Recreate the disks.
4073 for idx, _ in enumerate(self.instance.disks):
4074 if idx not in self.op.disks: # disk idx has not been passed in
4078 _CreateDisks(self, self.instance, to_skip=to_skip)
4081 class LURenameInstance(LogicalUnit):
4082 """Rename an instance.
4085 HPATH = "instance-rename"
4086 HTYPE = constants.HTYPE_INSTANCE
4087 _OP_REQP = ["instance_name", "new_name"]
4089 def BuildHooksEnv(self):
4092 This runs on master, primary and secondary nodes of the instance.
4095 env = _BuildInstanceHookEnvByObject(self, self.instance)
4096 env["INSTANCE_NEW_NAME"] = self.op.new_name
4097 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
4100 def CheckPrereq(self):
4101 """Check prerequisites.
4103 This checks that the instance is in the cluster and is not running.
4106 self.op.instance_name = _ExpandInstanceName(self.cfg,
4107 self.op.instance_name)
4108 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4109 assert instance is not None
4110 _CheckNodeOnline(self, instance.primary_node)
4112 if instance.admin_up:
4113 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
4114 self.op.instance_name, errors.ECODE_STATE)
4115 remote_info = self.rpc.call_instance_info(instance.primary_node,
4117 instance.hypervisor)
4118 remote_info.Raise("Error checking node %s" % instance.primary_node,
4119 prereq=True, ecode=errors.ECODE_ENVIRON)
4120 if remote_info.payload:
4121 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
4122 (self.op.instance_name,
4123 instance.primary_node), errors.ECODE_STATE)
4124 self.instance = instance
4126 # new name verification
4127 name_info = utils.GetHostInfo(self.op.new_name)
4129 self.op.new_name = new_name = name_info.name
4130 instance_list = self.cfg.GetInstanceList()
4131 if new_name in instance_list:
4132 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4133 new_name, errors.ECODE_EXISTS)
4135 if not getattr(self.op, "ignore_ip", False):
4136 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
4137 raise errors.OpPrereqError("IP %s of instance %s already in use" %
4138 (name_info.ip, new_name),
4139 errors.ECODE_NOTUNIQUE)
4142 def Exec(self, feedback_fn):
4143 """Reinstall the instance.
4146 inst = self.instance
4147 old_name = inst.name
4149 if inst.disk_template == constants.DT_FILE:
4150 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4152 self.cfg.RenameInstance(inst.name, self.op.new_name)
4153 # Change the instance lock. This is definitely safe while we hold the BGL
4154 self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
4155 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
4157 # re-read the instance from the configuration after rename
4158 inst = self.cfg.GetInstanceInfo(self.op.new_name)
4160 if inst.disk_template == constants.DT_FILE:
4161 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
4162 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
4163 old_file_storage_dir,
4164 new_file_storage_dir)
4165 result.Raise("Could not rename on node %s directory '%s' to '%s'"
4166 " (but the instance has been renamed in Ganeti)" %
4167 (inst.primary_node, old_file_storage_dir,
4168 new_file_storage_dir))
4170 _StartInstanceDisks(self, inst, None)
4172 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
4173 old_name, self.op.debug_level)
4174 msg = result.fail_msg
4176 msg = ("Could not run OS rename script for instance %s on node %s"
4177 " (but the instance has been renamed in Ganeti): %s" %
4178 (inst.name, inst.primary_node, msg))
4179 self.proc.LogWarning(msg)
4181 _ShutdownInstanceDisks(self, inst)
4184 class LURemoveInstance(LogicalUnit):
4185 """Remove an instance.
4188 HPATH = "instance-remove"
4189 HTYPE = constants.HTYPE_INSTANCE
4190 _OP_REQP = ["instance_name", "ignore_failures"]
4193 def CheckArguments(self):
4194 """Check the arguments.
4197 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4198 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4200 def ExpandNames(self):
4201 self._ExpandAndLockInstance()
4202 self.needed_locks[locking.LEVEL_NODE] = []
4203 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4205 def DeclareLocks(self, level):
4206 if level == locking.LEVEL_NODE:
4207 self._LockInstancesNodes()
4209 def BuildHooksEnv(self):
4212 This runs on master, primary and secondary nodes of the instance.
4215 env = _BuildInstanceHookEnvByObject(self, self.instance)
4216 env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
4217 nl = [self.cfg.GetMasterNode()]
4218 nl_post = list(self.instance.all_nodes) + nl
4219 return env, nl, nl_post
4221 def CheckPrereq(self):
4222 """Check prerequisites.
4224 This checks that the instance is in the cluster.
4227 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4228 assert self.instance is not None, \
4229 "Cannot retrieve locked instance %s" % self.op.instance_name
4231 def Exec(self, feedback_fn):
4232 """Remove the instance.
4235 instance = self.instance
4236 logging.info("Shutting down instance %s on node %s",
4237 instance.name, instance.primary_node)
4239 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
4240 self.shutdown_timeout)
4241 msg = result.fail_msg
4243 if self.op.ignore_failures:
4244 feedback_fn("Warning: can't shutdown instance: %s" % msg)
4246 raise errors.OpExecError("Could not shutdown instance %s on"
4248 (instance.name, instance.primary_node, msg))
4250 logging.info("Removing block devices for instance %s", instance.name)
4252 if not _RemoveDisks(self, instance):
4253 if self.op.ignore_failures:
4254 feedback_fn("Warning: can't remove instance's disks")
4256 raise errors.OpExecError("Can't remove instance's disks")
4258 logging.info("Removing instance %s out of cluster config", instance.name)
4260 self.cfg.RemoveInstance(instance.name)
4261 self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
4264 class LUQueryInstances(NoHooksLU):
4265 """Logical unit for querying instances.
4268 # pylint: disable-msg=W0142
4269 _OP_REQP = ["output_fields", "names", "use_locking"]
4271 _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
4272 "serial_no", "ctime", "mtime", "uuid"]
4273 _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
4275 "disk_template", "ip", "mac", "bridge",
4276 "nic_mode", "nic_link",
4277 "sda_size", "sdb_size", "vcpus", "tags",
4278 "network_port", "beparams",
4279 r"(disk)\.(size)/([0-9]+)",
4280 r"(disk)\.(sizes)", "disk_usage",
4281 r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
4282 r"(nic)\.(bridge)/([0-9]+)",
4283 r"(nic)\.(macs|ips|modes|links|bridges)",
4284 r"(disk|nic)\.(count)",
4286 ] + _SIMPLE_FIELDS +
4288 for name in constants.HVS_PARAMETERS
4289 if name not in constants.HVC_GLOBALS] +
4291 for name in constants.BES_PARAMETERS])
4292 _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
4295 def ExpandNames(self):
4296 _CheckOutputFields(static=self._FIELDS_STATIC,
4297 dynamic=self._FIELDS_DYNAMIC,
4298 selected=self.op.output_fields)
4300 self.needed_locks = {}
4301 self.share_locks[locking.LEVEL_INSTANCE] = 1
4302 self.share_locks[locking.LEVEL_NODE] = 1
4305 self.wanted = _GetWantedInstances(self, self.op.names)
4307 self.wanted = locking.ALL_SET
4309 self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
4310 self.do_locking = self.do_node_query and self.op.use_locking
4312 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
4313 self.needed_locks[locking.LEVEL_NODE] = []
4314 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4316 def DeclareLocks(self, level):
4317 if level == locking.LEVEL_NODE and self.do_locking:
4318 self._LockInstancesNodes()
4320 def CheckPrereq(self):
4321 """Check prerequisites.
4326 def Exec(self, feedback_fn):
4327 """Computes the list of nodes and their attributes.
4330 # pylint: disable-msg=R0912
4331 # way too many branches here
4332 all_info = self.cfg.GetAllInstancesInfo()
4333 if self.wanted == locking.ALL_SET:
4334 # caller didn't specify instance names, so ordering is not important
4336 instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4338 instance_names = all_info.keys()
4339 instance_names = utils.NiceSort(instance_names)
4341 # caller did specify names, so we must keep the ordering
4343 tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
4345 tgt_set = all_info.keys()
4346 missing = set(self.wanted).difference(tgt_set)
4348 raise errors.OpExecError("Some instances were removed before"
4349 " retrieving their data: %s" % missing)
4350 instance_names = self.wanted
4352 instance_list = [all_info[iname] for iname in instance_names]
4354 # begin data gathering
4356 nodes = frozenset([inst.primary_node for inst in instance_list])
4357 hv_list = list(set([inst.hypervisor for inst in instance_list]))
4361 if self.do_node_query:
4363 node_data = self.rpc.call_all_instances_info(nodes, hv_list)
4365 result = node_data[name]
4367 # offline nodes will be in both lists
4368 off_nodes.append(name)
4370 bad_nodes.append(name)
4373 live_data.update(result.payload)
4374 # else no instance is alive
4376 live_data = dict([(name, {}) for name in instance_names])
4378 # end data gathering
4383 cluster = self.cfg.GetClusterInfo()
4384 for instance in instance_list:
4386 i_hv = cluster.FillHV(instance, skip_globals=True)
4387 i_be = cluster.FillBE(instance)
4388 i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4389 nic.nicparams) for nic in instance.nics]
4390 for field in self.op.output_fields:
4391 st_match = self._FIELDS_STATIC.Matches(field)
4392 if field in self._SIMPLE_FIELDS:
4393 val = getattr(instance, field)
4394 elif field == "pnode":
4395 val = instance.primary_node
4396 elif field == "snodes":
4397 val = list(instance.secondary_nodes)
4398 elif field == "admin_state":
4399 val = instance.admin_up
4400 elif field == "oper_state":
4401 if instance.primary_node in bad_nodes:
4404 val = bool(live_data.get(instance.name))
4405 elif field == "status":
4406 if instance.primary_node in off_nodes:
4407 val = "ERROR_nodeoffline"
4408 elif instance.primary_node in bad_nodes:
4409 val = "ERROR_nodedown"
4411 running = bool(live_data.get(instance.name))
4413 if instance.admin_up:
4418 if instance.admin_up:
4422 elif field == "oper_ram":
4423 if instance.primary_node in bad_nodes:
4425 elif instance.name in live_data:
4426 val = live_data[instance.name].get("memory", "?")
4429 elif field == "vcpus":
4430 val = i_be[constants.BE_VCPUS]
4431 elif field == "disk_template":
4432 val = instance.disk_template
4435 val = instance.nics[0].ip
4438 elif field == "nic_mode":
4440 val = i_nicp[0][constants.NIC_MODE]
4443 elif field == "nic_link":
4445 val = i_nicp[0][constants.NIC_LINK]
4448 elif field == "bridge":
4449 if (instance.nics and
4450 i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
4451 val = i_nicp[0][constants.NIC_LINK]
4454 elif field == "mac":
4456 val = instance.nics[0].mac
4459 elif field == "sda_size" or field == "sdb_size":
4460 idx = ord(field[2]) - ord('a')
4462 val = instance.FindDisk(idx).size
4463 except errors.OpPrereqError:
4465 elif field == "disk_usage": # total disk usage per node
4466 disk_sizes = [{'size': disk.size} for disk in instance.disks]
4467 val = _ComputeDiskSize(instance.disk_template, disk_sizes)
4468 elif field == "tags":
4469 val = list(instance.GetTags())
4470 elif field == "hvparams":
4472 elif (field.startswith(HVPREFIX) and
4473 field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
4474 field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
4475 val = i_hv.get(field[len(HVPREFIX):], None)
4476 elif field == "beparams":
4478 elif (field.startswith(BEPREFIX) and
4479 field[len(BEPREFIX):] in constants.BES_PARAMETERS):
4480 val = i_be.get(field[len(BEPREFIX):], None)
4481 elif st_match and st_match.groups():
4482 # matches a variable list
4483 st_groups = st_match.groups()
4484 if st_groups and st_groups[0] == "disk":
4485 if st_groups[1] == "count":
4486 val = len(instance.disks)
4487 elif st_groups[1] == "sizes":
4488 val = [disk.size for disk in instance.disks]
4489 elif st_groups[1] == "size":
4491 val = instance.FindDisk(st_groups[2]).size
4492 except errors.OpPrereqError:
4495 assert False, "Unhandled disk parameter"
4496 elif st_groups[0] == "nic":
4497 if st_groups[1] == "count":
4498 val = len(instance.nics)
4499 elif st_groups[1] == "macs":
4500 val = [nic.mac for nic in instance.nics]
4501 elif st_groups[1] == "ips":
4502 val = [nic.ip for nic in instance.nics]
4503 elif st_groups[1] == "modes":
4504 val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
4505 elif st_groups[1] == "links":
4506 val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
4507 elif st_groups[1] == "bridges":
4510 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
4511 val.append(nicp[constants.NIC_LINK])
4516 nic_idx = int(st_groups[2])
4517 if nic_idx >= len(instance.nics):
4520 if st_groups[1] == "mac":
4521 val = instance.nics[nic_idx].mac
4522 elif st_groups[1] == "ip":
4523 val = instance.nics[nic_idx].ip
4524 elif st_groups[1] == "mode":
4525 val = i_nicp[nic_idx][constants.NIC_MODE]
4526 elif st_groups[1] == "link":
4527 val = i_nicp[nic_idx][constants.NIC_LINK]
4528 elif st_groups[1] == "bridge":
4529 nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
4530 if nic_mode == constants.NIC_MODE_BRIDGED:
4531 val = i_nicp[nic_idx][constants.NIC_LINK]
4535 assert False, "Unhandled NIC parameter"
4537 assert False, ("Declared but unhandled variable parameter '%s'" %
4540 assert False, "Declared but unhandled parameter '%s'" % field
4547 class LUFailoverInstance(LogicalUnit):
4548 """Failover an instance.
4551 HPATH = "instance-failover"
4552 HTYPE = constants.HTYPE_INSTANCE
4553 _OP_REQP = ["instance_name", "ignore_consistency"]
4556 def CheckArguments(self):
4557 """Check the arguments.
4560 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4561 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4563 def ExpandNames(self):
4564 self._ExpandAndLockInstance()
4565 self.needed_locks[locking.LEVEL_NODE] = []
4566 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4568 def DeclareLocks(self, level):
4569 if level == locking.LEVEL_NODE:
4570 self._LockInstancesNodes()
4572 def BuildHooksEnv(self):
4575 This runs on master, primary and secondary nodes of the instance.
4578 instance = self.instance
4579 source_node = instance.primary_node
4580 target_node = instance.secondary_nodes[0]
4582 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
4583 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4584 "OLD_PRIMARY": source_node,
4585 "OLD_SECONDARY": target_node,
4586 "NEW_PRIMARY": target_node,
4587 "NEW_SECONDARY": source_node,
4589 env.update(_BuildInstanceHookEnvByObject(self, instance))
4590 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4592 nl_post.append(source_node)
4593 return env, nl, nl_post
4595 def CheckPrereq(self):
4596 """Check prerequisites.
4598 This checks that the instance is in the cluster.
4601 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4602 assert self.instance is not None, \
4603 "Cannot retrieve locked instance %s" % self.op.instance_name
4605 bep = self.cfg.GetClusterInfo().FillBE(instance)
4606 if instance.disk_template not in constants.DTS_NET_MIRROR:
4607 raise errors.OpPrereqError("Instance's disk layout is not"
4608 " network mirrored, cannot failover.",
4611 secondary_nodes = instance.secondary_nodes
4612 if not secondary_nodes:
4613 raise errors.ProgrammerError("no secondary node but using "
4614 "a mirrored disk template")
4616 target_node = secondary_nodes[0]
4617 _CheckNodeOnline(self, target_node)
4618 _CheckNodeNotDrained(self, target_node)
4619 if instance.admin_up:
4620 # check memory requirements on the secondary node
4621 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4622 instance.name, bep[constants.BE_MEMORY],
4623 instance.hypervisor)
4625 self.LogInfo("Not checking memory on the secondary node as"
4626 " instance will not be started")
4628 # check bridge existance
4629 _CheckInstanceBridgesExist(self, instance, node=target_node)
4631 def Exec(self, feedback_fn):
4632 """Failover an instance.
4634 The failover is done by shutting it down on its present node and
4635 starting it on the secondary.
4638 instance = self.instance
4640 source_node = instance.primary_node
4641 target_node = instance.secondary_nodes[0]
4643 if instance.admin_up:
4644 feedback_fn("* checking disk consistency between source and target")
4645 for dev in instance.disks:
4646 # for drbd, these are drbd over lvm
4647 if not _CheckDiskConsistency(self, dev, target_node, False):
4648 if not self.op.ignore_consistency:
4649 raise errors.OpExecError("Disk %s is degraded on target node,"
4650 " aborting failover." % dev.iv_name)
4652 feedback_fn("* not checking disk consistency as instance is not running")
4654 feedback_fn("* shutting down instance on source node")
4655 logging.info("Shutting down instance %s on node %s",
4656 instance.name, source_node)
4658 result = self.rpc.call_instance_shutdown(source_node, instance,
4659 self.shutdown_timeout)
4660 msg = result.fail_msg
4662 if self.op.ignore_consistency:
4663 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4664 " Proceeding anyway. Please make sure node"
4665 " %s is down. Error details: %s",
4666 instance.name, source_node, source_node, msg)
4668 raise errors.OpExecError("Could not shutdown instance %s on"
4670 (instance.name, source_node, msg))
4672 feedback_fn("* deactivating the instance's disks on source node")
4673 if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4674 raise errors.OpExecError("Can't shut down the instance's disks.")
4676 instance.primary_node = target_node
4677 # distribute new instance config to the other nodes
4678 self.cfg.Update(instance, feedback_fn)
4680 # Only start the instance if it's marked as up
4681 if instance.admin_up:
4682 feedback_fn("* activating the instance's disks on target node")
4683 logging.info("Starting instance %s on node %s",
4684 instance.name, target_node)
4686 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4687 ignore_secondaries=True)
4689 _ShutdownInstanceDisks(self, instance)
4690 raise errors.OpExecError("Can't activate the instance's disks")
4692 feedback_fn("* starting the instance on the target node")
4693 result = self.rpc.call_instance_start(target_node, instance, None, None)
4694 msg = result.fail_msg
4696 _ShutdownInstanceDisks(self, instance)
4697 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4698 (instance.name, target_node, msg))
4701 class LUMigrateInstance(LogicalUnit):
4702 """Migrate an instance.
4704 This is migration without shutting down, compared to the failover,
4705 which is done with shutdown.
4708 HPATH = "instance-migrate"
4709 HTYPE = constants.HTYPE_INSTANCE
4710 _OP_REQP = ["instance_name", "live", "cleanup"]
4714 def ExpandNames(self):
4715 self._ExpandAndLockInstance()
4717 self.needed_locks[locking.LEVEL_NODE] = []
4718 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4720 self._migrater = TLMigrateInstance(self, self.op.instance_name,
4721 self.op.live, self.op.cleanup)
4722 self.tasklets = [self._migrater]
4724 def DeclareLocks(self, level):
4725 if level == locking.LEVEL_NODE:
4726 self._LockInstancesNodes()
4728 def BuildHooksEnv(self):
4731 This runs on master, primary and secondary nodes of the instance.
4734 instance = self._migrater.instance
4735 source_node = instance.primary_node
4736 target_node = instance.secondary_nodes[0]
4737 env = _BuildInstanceHookEnvByObject(self, instance)
4738 env["MIGRATE_LIVE"] = self.op.live
4739 env["MIGRATE_CLEANUP"] = self.op.cleanup
4741 "OLD_PRIMARY": source_node,
4742 "OLD_SECONDARY": target_node,
4743 "NEW_PRIMARY": target_node,
4744 "NEW_SECONDARY": source_node,
4746 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4748 nl_post.append(source_node)
4749 return env, nl, nl_post
4752 class LUMoveInstance(LogicalUnit):
4753 """Move an instance by data-copying.
4756 HPATH = "instance-move"
4757 HTYPE = constants.HTYPE_INSTANCE
4758 _OP_REQP = ["instance_name", "target_node"]
4761 def CheckArguments(self):
4762 """Check the arguments.
4765 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
4766 constants.DEFAULT_SHUTDOWN_TIMEOUT)
4768 def ExpandNames(self):
4769 self._ExpandAndLockInstance()
4770 target_node = _ExpandNodeName(self.cfg, self.op.target_node)
4771 self.op.target_node = target_node
4772 self.needed_locks[locking.LEVEL_NODE] = [target_node]
4773 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4775 def DeclareLocks(self, level):
4776 if level == locking.LEVEL_NODE:
4777 self._LockInstancesNodes(primary_only=True)
4779 def BuildHooksEnv(self):
4782 This runs on master, primary and secondary nodes of the instance.
4786 "TARGET_NODE": self.op.target_node,
4787 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
4789 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4790 nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
4791 self.op.target_node]
4794 def CheckPrereq(self):
4795 """Check prerequisites.
4797 This checks that the instance is in the cluster.
4800 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4801 assert self.instance is not None, \
4802 "Cannot retrieve locked instance %s" % self.op.instance_name
4804 node = self.cfg.GetNodeInfo(self.op.target_node)
4805 assert node is not None, \
4806 "Cannot retrieve locked node %s" % self.op.target_node
4808 self.target_node = target_node = node.name
4810 if target_node == instance.primary_node:
4811 raise errors.OpPrereqError("Instance %s is already on the node %s" %
4812 (instance.name, target_node),
4815 bep = self.cfg.GetClusterInfo().FillBE(instance)
4817 for idx, dsk in enumerate(instance.disks):
4818 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
4819 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
4820 " cannot copy" % idx, errors.ECODE_STATE)
4822 _CheckNodeOnline(self, target_node)
4823 _CheckNodeNotDrained(self, target_node)
4825 if instance.admin_up:
4826 # check memory requirements on the secondary node
4827 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4828 instance.name, bep[constants.BE_MEMORY],
4829 instance.hypervisor)
4831 self.LogInfo("Not checking memory on the secondary node as"
4832 " instance will not be started")
4834 # check bridge existance
4835 _CheckInstanceBridgesExist(self, instance, node=target_node)
4837 def Exec(self, feedback_fn):
4838 """Move an instance.
4840 The move is done by shutting it down on its present node, copying
4841 the data over (slow) and starting it on the new node.
4844 instance = self.instance
4846 source_node = instance.primary_node
4847 target_node = self.target_node
4849 self.LogInfo("Shutting down instance %s on source node %s",
4850 instance.name, source_node)
4852 result = self.rpc.call_instance_shutdown(source_node, instance,
4853 self.shutdown_timeout)
4854 msg = result.fail_msg
4856 if self.op.ignore_consistency:
4857 self.proc.LogWarning("Could not shutdown instance %s on node %s."
4858 " Proceeding anyway. Please make sure node"
4859 " %s is down. Error details: %s",
4860 instance.name, source_node, source_node, msg)
4862 raise errors.OpExecError("Could not shutdown instance %s on"
4864 (instance.name, source_node, msg))
4866 # create the target disks
4868 _CreateDisks(self, instance, target_node=target_node)
4869 except errors.OpExecError:
4870 self.LogWarning("Device creation failed, reverting...")
4872 _RemoveDisks(self, instance, target_node=target_node)
4874 self.cfg.ReleaseDRBDMinors(instance.name)
4877 cluster_name = self.cfg.GetClusterInfo().cluster_name
4880 # activate, get path, copy the data over
4881 for idx, disk in enumerate(instance.disks):
4882 self.LogInfo("Copying data for disk %d", idx)
4883 result = self.rpc.call_blockdev_assemble(target_node, disk,
4884 instance.name, True)
4886 self.LogWarning("Can't assemble newly created disk %d: %s",
4887 idx, result.fail_msg)
4888 errs.append(result.fail_msg)
4890 dev_path = result.payload
4891 result = self.rpc.call_blockdev_export(source_node, disk,
4892 target_node, dev_path,
4895 self.LogWarning("Can't copy data over for disk %d: %s",
4896 idx, result.fail_msg)
4897 errs.append(result.fail_msg)
4901 self.LogWarning("Some disks failed to copy, aborting")
4903 _RemoveDisks(self, instance, target_node=target_node)
4905 self.cfg.ReleaseDRBDMinors(instance.name)
4906 raise errors.OpExecError("Errors during disk copy: %s" %
4909 instance.primary_node = target_node
4910 self.cfg.Update(instance, feedback_fn)
4912 self.LogInfo("Removing the disks on the original node")
4913 _RemoveDisks(self, instance, target_node=source_node)
4915 # Only start the instance if it's marked as up
4916 if instance.admin_up:
4917 self.LogInfo("Starting instance %s on node %s",
4918 instance.name, target_node)
4920 disks_ok, _ = _AssembleInstanceDisks(self, instance,
4921 ignore_secondaries=True)
4923 _ShutdownInstanceDisks(self, instance)
4924 raise errors.OpExecError("Can't activate the instance's disks")
4926 result = self.rpc.call_instance_start(target_node, instance, None, None)
4927 msg = result.fail_msg
4929 _ShutdownInstanceDisks(self, instance)
4930 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4931 (instance.name, target_node, msg))
4934 class LUMigrateNode(LogicalUnit):
4935 """Migrate all instances from a node.
4938 HPATH = "node-migrate"
4939 HTYPE = constants.HTYPE_NODE
4940 _OP_REQP = ["node_name", "live"]
4943 def ExpandNames(self):
4944 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
4946 self.needed_locks = {
4947 locking.LEVEL_NODE: [self.op.node_name],
4950 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4952 # Create tasklets for migrating instances for all instances on this node
4956 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4957 logging.debug("Migrating instance %s", inst.name)
4958 names.append(inst.name)
4960 tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4962 self.tasklets = tasklets
4964 # Declare instance locks
4965 self.needed_locks[locking.LEVEL_INSTANCE] = names
4967 def DeclareLocks(self, level):
4968 if level == locking.LEVEL_NODE:
4969 self._LockInstancesNodes()
4971 def BuildHooksEnv(self):
4974 This runs on the master, the primary and all the secondaries.
4978 "NODE_NAME": self.op.node_name,
4981 nl = [self.cfg.GetMasterNode()]
4983 return (env, nl, nl)
4986 class TLMigrateInstance(Tasklet):
4987 def __init__(self, lu, instance_name, live, cleanup):
4988 """Initializes this class.
4991 Tasklet.__init__(self, lu)
4994 self.instance_name = instance_name
4996 self.cleanup = cleanup
4998 def CheckPrereq(self):
4999 """Check prerequisites.
5001 This checks that the instance is in the cluster.
5004 instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
5005 instance = self.cfg.GetInstanceInfo(instance_name)
5006 assert instance is not None
5008 if instance.disk_template != constants.DT_DRBD8:
5009 raise errors.OpPrereqError("Instance's disk layout is not"
5010 " drbd8, cannot migrate.", errors.ECODE_STATE)
5012 secondary_nodes = instance.secondary_nodes
5013 if not secondary_nodes:
5014 raise errors.ConfigurationError("No secondary node but using"
5015 " drbd8 disk template")
5017 i_be = self.cfg.GetClusterInfo().FillBE(instance)
5019 target_node = secondary_nodes[0]
5020 # check memory requirements on the secondary node
5021 _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
5022 instance.name, i_be[constants.BE_MEMORY],
5023 instance.hypervisor)
5025 # check bridge existance
5026 _CheckInstanceBridgesExist(self, instance, node=target_node)
5028 if not self.cleanup:
5029 _CheckNodeNotDrained(self, target_node)
5030 result = self.rpc.call_instance_migratable(instance.primary_node,
5032 result.Raise("Can't migrate, please use failover",
5033 prereq=True, ecode=errors.ECODE_STATE)
5035 self.instance = instance
5037 def _WaitUntilSync(self):
5038 """Poll with custom rpc for disk sync.
5040 This uses our own step-based rpc call.
5043 self.feedback_fn("* wait until resync is done")
5047 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
5049 self.instance.disks)
5051 for node, nres in result.items():
5052 nres.Raise("Cannot resync disks on node %s" % node)
5053 node_done, node_percent = nres.payload
5054 all_done = all_done and node_done
5055 if node_percent is not None:
5056 min_percent = min(min_percent, node_percent)
5058 if min_percent < 100:
5059 self.feedback_fn(" - progress: %.1f%%" % min_percent)
5062 def _EnsureSecondary(self, node):
5063 """Demote a node to secondary.
5066 self.feedback_fn("* switching node %s to secondary mode" % node)
5068 for dev in self.instance.disks:
5069 self.cfg.SetDiskID(dev, node)
5071 result = self.rpc.call_blockdev_close(node, self.instance.name,
5072 self.instance.disks)
5073 result.Raise("Cannot change disk to secondary on node %s" % node)
5075 def _GoStandalone(self):
5076 """Disconnect from the network.
5079 self.feedback_fn("* changing into standalone mode")
5080 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
5081 self.instance.disks)
5082 for node, nres in result.items():
5083 nres.Raise("Cannot disconnect disks node %s" % node)
5085 def _GoReconnect(self, multimaster):
5086 """Reconnect to the network.
5092 msg = "single-master"
5093 self.feedback_fn("* changing disks into %s mode" % msg)
5094 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
5095 self.instance.disks,
5096 self.instance.name, multimaster)
5097 for node, nres in result.items():
5098 nres.Raise("Cannot change disks config on node %s" % node)
5100 def _ExecCleanup(self):
5101 """Try to cleanup after a failed migration.
5103 The cleanup is done by:
5104 - check that the instance is running only on one node
5105 (and update the config if needed)
5106 - change disks on its secondary node to secondary
5107 - wait until disks are fully synchronized
5108 - disconnect from the network
5109 - change disks into single-master mode
5110 - wait again until disks are fully synchronized
5113 instance = self.instance
5114 target_node = self.target_node
5115 source_node = self.source_node
5117 # check running on only one node
5118 self.feedback_fn("* checking where the instance actually runs"
5119 " (if this hangs, the hypervisor might be in"
5121 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
5122 for node, result in ins_l.items():
5123 result.Raise("Can't contact node %s" % node)
5125 runningon_source = instance.name in ins_l[source_node].payload
5126 runningon_target = instance.name in ins_l[target_node].payload
5128 if runningon_source and runningon_target:
5129 raise errors.OpExecError("Instance seems to be running on two nodes,"
5130 " or the hypervisor is confused. You will have"
5131 " to ensure manually that it runs only on one"
5132 " and restart this operation.")
5134 if not (runningon_source or runningon_target):
5135 raise errors.OpExecError("Instance does not seem to be running at all."
5136 " In this case, it's safer to repair by"
5137 " running 'gnt-instance stop' to ensure disk"
5138 " shutdown, and then restarting it.")
5140 if runningon_target:
5141 # the migration has actually succeeded, we need to update the config
5142 self.feedback_fn("* instance running on secondary node (%s),"
5143 " updating config" % target_node)
5144 instance.primary_node = target_node
5145 self.cfg.Update(instance, self.feedback_fn)
5146 demoted_node = source_node
5148 self.feedback_fn("* instance confirmed to be running on its"
5149 " primary node (%s)" % source_node)
5150 demoted_node = target_node
5152 self._EnsureSecondary(demoted_node)
5154 self._WaitUntilSync()
5155 except errors.OpExecError:
5156 # we ignore here errors, since if the device is standalone, it
5157 # won't be able to sync
5159 self._GoStandalone()
5160 self._GoReconnect(False)
5161 self._WaitUntilSync()
5163 self.feedback_fn("* done")
5165 def _RevertDiskStatus(self):
5166 """Try to revert the disk status after a failed migration.
5169 target_node = self.target_node
5171 self._EnsureSecondary(target_node)
5172 self._GoStandalone()
5173 self._GoReconnect(False)
5174 self._WaitUntilSync()
5175 except errors.OpExecError, err:
5176 self.lu.LogWarning("Migration failed and I can't reconnect the"
5177 " drives: error '%s'\n"
5178 "Please look and recover the instance status" %
5181 def _AbortMigration(self):
5182 """Call the hypervisor code to abort a started migration.
5185 instance = self.instance
5186 target_node = self.target_node
5187 migration_info = self.migration_info
5189 abort_result = self.rpc.call_finalize_migration(target_node,
5193 abort_msg = abort_result.fail_msg
5195 logging.error("Aborting migration failed on target node %s: %s",
5196 target_node, abort_msg)
5197 # Don't raise an exception here, as we stil have to try to revert the
5198 # disk status, even if this step failed.
5200 def _ExecMigration(self):
5201 """Migrate an instance.
5203 The migrate is done by:
5204 - change the disks into dual-master mode
5205 - wait until disks are fully synchronized again
5206 - migrate the instance
5207 - change disks on the new secondary node (the old primary) to secondary
5208 - wait until disks are fully synchronized
5209 - change disks into single-master mode
5212 instance = self.instance
5213 target_node = self.target_node
5214 source_node = self.source_node
5216 self.feedback_fn("* checking disk consistency between source and target")
5217 for dev in instance.disks:
5218 if not _CheckDiskConsistency(self, dev, target_node, False):
5219 raise errors.OpExecError("Disk %s is degraded or not fully"
5220 " synchronized on target node,"
5221 " aborting migrate." % dev.iv_name)
5223 # First get the migration information from the remote node
5224 result = self.rpc.call_migration_info(source_node, instance)
5225 msg = result.fail_msg
5227 log_err = ("Failed fetching source migration information from %s: %s" %
5229 logging.error(log_err)
5230 raise errors.OpExecError(log_err)
5232 self.migration_info = migration_info = result.payload
5234 # Then switch the disks to master/master mode
5235 self._EnsureSecondary(target_node)
5236 self._GoStandalone()
5237 self._GoReconnect(True)
5238 self._WaitUntilSync()
5240 self.feedback_fn("* preparing %s to accept the instance" % target_node)
5241 result = self.rpc.call_accept_instance(target_node,
5244 self.nodes_ip[target_node])
5246 msg = result.fail_msg
5248 logging.error("Instance pre-migration failed, trying to revert"
5249 " disk status: %s", msg)
5250 self.feedback_fn("Pre-migration failed, aborting")
5251 self._AbortMigration()
5252 self._RevertDiskStatus()
5253 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
5254 (instance.name, msg))
5256 self.feedback_fn("* migrating instance to %s" % target_node)
5258 result = self.rpc.call_instance_migrate(source_node, instance,
5259 self.nodes_ip[target_node],
5261 msg = result.fail_msg
5263 logging.error("Instance migration failed, trying to revert"
5264 " disk status: %s", msg)
5265 self.feedback_fn("Migration failed, aborting")
5266 self._AbortMigration()
5267 self._RevertDiskStatus()
5268 raise errors.OpExecError("Could not migrate instance %s: %s" %
5269 (instance.name, msg))
5272 instance.primary_node = target_node
5273 # distribute new instance config to the other nodes
5274 self.cfg.Update(instance, self.feedback_fn)
5276 result = self.rpc.call_finalize_migration(target_node,
5280 msg = result.fail_msg
5282 logging.error("Instance migration succeeded, but finalization failed:"
5284 raise errors.OpExecError("Could not finalize instance migration: %s" %
5287 self._EnsureSecondary(source_node)
5288 self._WaitUntilSync()
5289 self._GoStandalone()
5290 self._GoReconnect(False)
5291 self._WaitUntilSync()
5293 self.feedback_fn("* done")
5295 def Exec(self, feedback_fn):
5296 """Perform the migration.
5299 feedback_fn("Migrating instance %s" % self.instance.name)
5301 self.feedback_fn = feedback_fn
5303 self.source_node = self.instance.primary_node
5304 self.target_node = self.instance.secondary_nodes[0]
5305 self.all_nodes = [self.source_node, self.target_node]
5307 self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
5308 self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
5312 return self._ExecCleanup()
5314 return self._ExecMigration()
5317 def _CreateBlockDev(lu, node, instance, device, force_create,
5319 """Create a tree of block devices on a given node.
5321 If this device type has to be created on secondaries, create it and
5324 If not, just recurse to children keeping the same 'force' value.
5326 @param lu: the lu on whose behalf we execute
5327 @param node: the node on which to create the device
5328 @type instance: L{objects.Instance}
5329 @param instance: the instance which owns the device
5330 @type device: L{objects.Disk}
5331 @param device: the device to create
5332 @type force_create: boolean
5333 @param force_create: whether to force creation of this device; this
5334 will be change to True whenever we find a device which has
5335 CreateOnSecondary() attribute
5336 @param info: the extra 'metadata' we should attach to the device
5337 (this will be represented as a LVM tag)
5338 @type force_open: boolean
5339 @param force_open: this parameter will be passes to the
5340 L{backend.BlockdevCreate} function where it specifies
5341 whether we run on primary or not, and it affects both
5342 the child assembly and the device own Open() execution
5345 if device.CreateOnSecondary():
5349 for child in device.children:
5350 _CreateBlockDev(lu, node, instance, child, force_create,
5353 if not force_create:
5356 _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
5359 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
5360 """Create a single block device on a given node.
5362 This will not recurse over children of the device, so they must be
5365 @param lu: the lu on whose behalf we execute
5366 @param node: the node on which to create the device
5367 @type instance: L{objects.Instance}
5368 @param instance: the instance which owns the device
5369 @type device: L{objects.Disk}
5370 @param device: the device to create
5371 @param info: the extra 'metadata' we should attach to the device
5372 (this will be represented as a LVM tag)
5373 @type force_open: boolean
5374 @param force_open: this parameter will be passes to the
5375 L{backend.BlockdevCreate} function where it specifies
5376 whether we run on primary or not, and it affects both
5377 the child assembly and the device own Open() execution
5380 lu.cfg.SetDiskID(device, node)
5381 result = lu.rpc.call_blockdev_create(node, device, device.size,
5382 instance.name, force_open, info)
5383 result.Raise("Can't create block device %s on"
5384 " node %s for instance %s" % (device, node, instance.name))
5385 if device.physical_id is None:
5386 device.physical_id = result.payload
5389 def _GenerateUniqueNames(lu, exts):
5390 """Generate a suitable LV name.
5392 This will generate a logical volume name for the given instance.
5397 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
5398 results.append("%s%s" % (new_id, val))
5402 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
5404 """Generate a drbd8 device complete with its children.
5407 port = lu.cfg.AllocatePort()
5408 vgname = lu.cfg.GetVGName()
5409 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
5410 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5411 logical_id=(vgname, names[0]))
5412 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5413 logical_id=(vgname, names[1]))
5414 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
5415 logical_id=(primary, secondary, port,
5418 children=[dev_data, dev_meta],
5423 def _GenerateDiskTemplate(lu, template_name,
5424 instance_name, primary_node,
5425 secondary_nodes, disk_info,
5426 file_storage_dir, file_driver,
5428 """Generate the entire disk layout for a given template type.
5431 #TODO: compute space requirements
5433 vgname = lu.cfg.GetVGName()
5434 disk_count = len(disk_info)
5436 if template_name == constants.DT_DISKLESS:
5438 elif template_name == constants.DT_PLAIN:
5439 if len(secondary_nodes) != 0:
5440 raise errors.ProgrammerError("Wrong template configuration")
5442 names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5443 for i in range(disk_count)])
5444 for idx, disk in enumerate(disk_info):
5445 disk_index = idx + base_index
5446 disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
5447 logical_id=(vgname, names[idx]),
5448 iv_name="disk/%d" % disk_index,
5450 disks.append(disk_dev)
5451 elif template_name == constants.DT_DRBD8:
5452 if len(secondary_nodes) != 1:
5453 raise errors.ProgrammerError("Wrong template configuration")
5454 remote_node = secondary_nodes[0]
5455 minors = lu.cfg.AllocateDRBDMinor(
5456 [primary_node, remote_node] * len(disk_info), instance_name)
5459 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
5460 for i in range(disk_count)]):
5461 names.append(lv_prefix + "_data")
5462 names.append(lv_prefix + "_meta")
5463 for idx, disk in enumerate(disk_info):
5464 disk_index = idx + base_index
5465 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
5466 disk["size"], names[idx*2:idx*2+2],
5467 "disk/%d" % disk_index,
5468 minors[idx*2], minors[idx*2+1])
5469 disk_dev.mode = disk["mode"]
5470 disks.append(disk_dev)
5471 elif template_name == constants.DT_FILE:
5472 if len(secondary_nodes) != 0:
5473 raise errors.ProgrammerError("Wrong template configuration")
5475 for idx, disk in enumerate(disk_info):
5476 disk_index = idx + base_index
5477 disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
5478 iv_name="disk/%d" % disk_index,
5479 logical_id=(file_driver,
5480 "%s/disk%d" % (file_storage_dir,
5483 disks.append(disk_dev)
5485 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
5489 def _GetInstanceInfoText(instance):
5490 """Compute that text that should be added to the disk's metadata.
5493 return "originstname+%s" % instance.name
5496 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
5497 """Create all disks for an instance.
5499 This abstracts away some work from AddInstance.
5501 @type lu: L{LogicalUnit}
5502 @param lu: the logical unit on whose behalf we execute
5503 @type instance: L{objects.Instance}
5504 @param instance: the instance whose disks we should create
5506 @param to_skip: list of indices to skip
5507 @type target_node: string
5508 @param target_node: if passed, overrides the target node for creation
5510 @return: the success of the creation
5513 info = _GetInstanceInfoText(instance)
5514 if target_node is None:
5515 pnode = instance.primary_node
5516 all_nodes = instance.all_nodes
5521 if instance.disk_template == constants.DT_FILE:
5522 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5523 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
5525 result.Raise("Failed to create directory '%s' on"
5526 " node %s" % (file_storage_dir, pnode))
5528 # Note: this needs to be kept in sync with adding of disks in
5529 # LUSetInstanceParams
5530 for idx, device in enumerate(instance.disks):
5531 if to_skip and idx in to_skip:
5533 logging.info("Creating volume %s for instance %s",
5534 device.iv_name, instance.name)
5536 for node in all_nodes:
5537 f_create = node == pnode
5538 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
5541 def _RemoveDisks(lu, instance, target_node=None):
5542 """Remove all disks for an instance.
5544 This abstracts away some work from `AddInstance()` and
5545 `RemoveInstance()`. Note that in case some of the devices couldn't
5546 be removed, the removal will continue with the other ones (compare
5547 with `_CreateDisks()`).
5549 @type lu: L{LogicalUnit}
5550 @param lu: the logical unit on whose behalf we execute
5551 @type instance: L{objects.Instance}
5552 @param instance: the instance whose disks we should remove
5553 @type target_node: string
5554 @param target_node: used to override the node on which to remove the disks
5556 @return: the success of the removal
5559 logging.info("Removing block devices for instance %s", instance.name)
5562 for device in instance.disks:
5564 edata = [(target_node, device)]
5566 edata = device.ComputeNodeTree(instance.primary_node)
5567 for node, disk in edata:
5568 lu.cfg.SetDiskID(disk, node)
5569 msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
5571 lu.LogWarning("Could not remove block device %s on node %s,"
5572 " continuing anyway: %s", device.iv_name, node, msg)
5575 if instance.disk_template == constants.DT_FILE:
5576 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
5580 tgt = instance.primary_node
5581 result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
5583 lu.LogWarning("Could not remove directory '%s' on node %s: %s",
5584 file_storage_dir, instance.primary_node, result.fail_msg)
5590 def _ComputeDiskSize(disk_template, disks):
5591 """Compute disk size requirements in the volume group
5594 # Required free disk space as a function of disk and swap space
5596 constants.DT_DISKLESS: None,
5597 constants.DT_PLAIN: sum(d["size"] for d in disks),
5598 # 128 MB are added for drbd metadata for each disk
5599 constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
5600 constants.DT_FILE: None,
5603 if disk_template not in req_size_dict:
5604 raise errors.ProgrammerError("Disk template '%s' size requirement"
5605 " is unknown" % disk_template)
5607 return req_size_dict[disk_template]
5610 def _CheckHVParams(lu, nodenames, hvname, hvparams):
5611 """Hypervisor parameter validation.
5613 This function abstract the hypervisor parameter validation to be
5614 used in both instance create and instance modify.
5616 @type lu: L{LogicalUnit}
5617 @param lu: the logical unit for which we check
5618 @type nodenames: list
5619 @param nodenames: the list of nodes on which we should check
5620 @type hvname: string
5621 @param hvname: the name of the hypervisor we should use
5622 @type hvparams: dict
5623 @param hvparams: the parameters which we need to check
5624 @raise errors.OpPrereqError: if the parameters are not valid
5627 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
5630 for node in nodenames:
5634 info.Raise("Hypervisor parameter validation failed on node %s" % node)
5637 class LUCreateInstance(LogicalUnit):
5638 """Create an instance.
5641 HPATH = "instance-add"
5642 HTYPE = constants.HTYPE_INSTANCE
5643 _OP_REQP = ["instance_name", "disks", "disk_template",
5645 "wait_for_sync", "ip_check", "nics",
5646 "hvparams", "beparams"]
5649 def CheckArguments(self):
5653 # do not require name_check to ease forward/backward compatibility
5655 if not hasattr(self.op, "name_check"):
5656 self.op.name_check = True
5657 if self.op.ip_check and not self.op.name_check:
5658 # TODO: make the ip check more flexible and not depend on the name check
5659 raise errors.OpPrereqError("Cannot do ip checks without a name check",
5662 def ExpandNames(self):
5663 """ExpandNames for CreateInstance.
5665 Figure out the right locks for instance creation.
5668 self.needed_locks = {}
5670 # set optional parameters to none if they don't exist
5671 for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
5672 if not hasattr(self.op, attr):
5673 setattr(self.op, attr, None)
5675 # cheap checks, mostly valid constants given
5677 # verify creation mode
5678 if self.op.mode not in (constants.INSTANCE_CREATE,
5679 constants.INSTANCE_IMPORT):
5680 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
5681 self.op.mode, errors.ECODE_INVAL)
5683 # disk template and mirror node verification
5684 if self.op.disk_template not in constants.DISK_TEMPLATES:
5685 raise errors.OpPrereqError("Invalid disk template name",
5688 if self.op.hypervisor is None:
5689 self.op.hypervisor = self.cfg.GetHypervisorType()
5691 cluster = self.cfg.GetClusterInfo()
5692 enabled_hvs = cluster.enabled_hypervisors
5693 if self.op.hypervisor not in enabled_hvs:
5694 raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
5695 " cluster (%s)" % (self.op.hypervisor,
5696 ",".join(enabled_hvs)),
5699 # check hypervisor parameter syntax (locally)
5700 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
5701 filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
5703 hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
5704 hv_type.CheckParameterSyntax(filled_hvp)
5705 self.hv_full = filled_hvp
5706 # check that we don't specify global parameters on an instance
5707 _CheckGlobalHvParams(self.op.hvparams)
5709 # fill and remember the beparams dict
5710 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
5711 self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
5714 #### instance parameters check
5716 # instance name verification
5717 if self.op.name_check:
5718 hostname1 = utils.GetHostInfo(self.op.instance_name)
5719 self.op.instance_name = instance_name = hostname1.name
5720 # used in CheckPrereq for ip ping check
5721 self.check_ip = hostname1.ip
5723 instance_name = self.op.instance_name
5724 self.check_ip = None
5726 # this is just a preventive check, but someone might still add this
5727 # instance in the meantime, and creation will fail at lock-add time
5728 if instance_name in self.cfg.GetInstanceList():
5729 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
5730 instance_name, errors.ECODE_EXISTS)
5732 self.add_locks[locking.LEVEL_INSTANCE] = instance_name
5736 for idx, nic in enumerate(self.op.nics):
5737 nic_mode_req = nic.get("mode", None)
5738 nic_mode = nic_mode_req
5739 if nic_mode is None:
5740 nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
5742 # in routed mode, for the first nic, the default ip is 'auto'
5743 if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
5744 default_ip_mode = constants.VALUE_AUTO
5746 default_ip_mode = constants.VALUE_NONE
5748 # ip validity checks
5749 ip = nic.get("ip", default_ip_mode)
5750 if ip is None or ip.lower() == constants.VALUE_NONE:
5752 elif ip.lower() == constants.VALUE_AUTO:
5753 if not self.op.name_check:
5754 raise errors.OpPrereqError("IP address set to auto but name checks"
5755 " have been skipped. Aborting.",
5757 nic_ip = hostname1.ip
5759 if not utils.IsValidIP(ip):
5760 raise errors.OpPrereqError("Given IP address '%s' doesn't look"
5761 " like a valid IP" % ip,
5765 # TODO: check the ip address for uniqueness
5766 if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
5767 raise errors.OpPrereqError("Routed nic mode requires an ip address",
5770 # MAC address verification
5771 mac = nic.get("mac", constants.VALUE_AUTO)
5772 if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5773 mac = utils.NormalizeAndValidateMac(mac)
5776 self.cfg.ReserveMAC(mac, self.proc.GetECId())
5777 except errors.ReservationError:
5778 raise errors.OpPrereqError("MAC address %s already in use"
5779 " in cluster" % mac,
5780 errors.ECODE_NOTUNIQUE)
5782 # bridge verification
5783 bridge = nic.get("bridge", None)
5784 link = nic.get("link", None)
5786 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5787 " at the same time", errors.ECODE_INVAL)
5788 elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
5789 raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
5796 nicparams[constants.NIC_MODE] = nic_mode_req
5798 nicparams[constants.NIC_LINK] = link
5800 check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
5802 objects.NIC.CheckParameterSyntax(check_params)
5803 self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
5805 # disk checks/pre-build
5807 for disk in self.op.disks:
5808 mode = disk.get("mode", constants.DISK_RDWR)
5809 if mode not in constants.DISK_ACCESS_SET:
5810 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
5811 mode, errors.ECODE_INVAL)
5812 size = disk.get("size", None)
5814 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
5817 except (TypeError, ValueError):
5818 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
5820 self.disks.append({"size": size, "mode": mode})
5822 # file storage checks
5823 if (self.op.file_driver and
5824 not self.op.file_driver in constants.FILE_DRIVER):
5825 raise errors.OpPrereqError("Invalid file driver name '%s'" %
5826 self.op.file_driver, errors.ECODE_INVAL)
5828 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
5829 raise errors.OpPrereqError("File storage directory path not absolute",
5832 ### Node/iallocator related checks
5833 if [self.op.iallocator, self.op.pnode].count(None) != 1:
5834 raise errors.OpPrereqError("One and only one of iallocator and primary"
5835 " node must be given",
5838 if self.op.iallocator:
5839 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5841 self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
5842 nodelist = [self.op.pnode]
5843 if self.op.snode is not None:
5844 self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
5845 nodelist.append(self.op.snode)
5846 self.needed_locks[locking.LEVEL_NODE] = nodelist
5848 # in case of import lock the source node too
5849 if self.op.mode == constants.INSTANCE_IMPORT:
5850 src_node = getattr(self.op, "src_node", None)
5851 src_path = getattr(self.op, "src_path", None)
5853 if src_path is None:
5854 self.op.src_path = src_path = self.op.instance_name
5856 if src_node is None:
5857 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5858 self.op.src_node = None
5859 if os.path.isabs(src_path):
5860 raise errors.OpPrereqError("Importing an instance from an absolute"
5861 " path requires a source node option.",
5864 self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
5865 if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5866 self.needed_locks[locking.LEVEL_NODE].append(src_node)
5867 if not os.path.isabs(src_path):
5868 self.op.src_path = src_path = \
5869 os.path.join(constants.EXPORT_DIR, src_path)
5871 # On import force_variant must be True, because if we forced it at
5872 # initial install, our only chance when importing it back is that it
5874 self.op.force_variant = True
5876 else: # INSTANCE_CREATE
5877 if getattr(self.op, "os_type", None) is None:
5878 raise errors.OpPrereqError("No guest OS specified",
5880 self.op.force_variant = getattr(self.op, "force_variant", False)
5882 def _RunAllocator(self):
5883 """Run the allocator based on input opcode.
5886 nics = [n.ToDict() for n in self.nics]
5887 ial = IAllocator(self.cfg, self.rpc,
5888 mode=constants.IALLOCATOR_MODE_ALLOC,
5889 name=self.op.instance_name,
5890 disk_template=self.op.disk_template,
5893 vcpus=self.be_full[constants.BE_VCPUS],
5894 mem_size=self.be_full[constants.BE_MEMORY],
5897 hypervisor=self.op.hypervisor,
5900 ial.Run(self.op.iallocator)
5903 raise errors.OpPrereqError("Can't compute nodes using"
5904 " iallocator '%s': %s" %
5905 (self.op.iallocator, ial.info),
5907 if len(ial.result) != ial.required_nodes:
5908 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5909 " of nodes (%s), required %s" %
5910 (self.op.iallocator, len(ial.result),
5911 ial.required_nodes), errors.ECODE_FAULT)
5912 self.op.pnode = ial.result[0]
5913 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5914 self.op.instance_name, self.op.iallocator,
5915 utils.CommaJoin(ial.result))
5916 if ial.required_nodes == 2:
5917 self.op.snode = ial.result[1]
5919 def BuildHooksEnv(self):
5922 This runs on master, primary and secondary nodes of the instance.
5926 "ADD_MODE": self.op.mode,
5928 if self.op.mode == constants.INSTANCE_IMPORT:
5929 env["SRC_NODE"] = self.op.src_node
5930 env["SRC_PATH"] = self.op.src_path
5931 env["SRC_IMAGES"] = self.src_images
5933 env.update(_BuildInstanceHookEnv(
5934 name=self.op.instance_name,
5935 primary_node=self.op.pnode,
5936 secondary_nodes=self.secondaries,
5937 status=self.op.start,
5938 os_type=self.op.os_type,
5939 memory=self.be_full[constants.BE_MEMORY],
5940 vcpus=self.be_full[constants.BE_VCPUS],
5941 nics=_NICListToTuple(self, self.nics),
5942 disk_template=self.op.disk_template,
5943 disks=[(d["size"], d["mode"]) for d in self.disks],
5946 hypervisor_name=self.op.hypervisor,
5949 nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5954 def CheckPrereq(self):
5955 """Check prerequisites.
5958 if (not self.cfg.GetVGName() and
5959 self.op.disk_template not in constants.DTS_NOT_LVM):
5960 raise errors.OpPrereqError("Cluster does not support lvm-based"
5961 " instances", errors.ECODE_STATE)
5963 if self.op.mode == constants.INSTANCE_IMPORT:
5964 src_node = self.op.src_node
5965 src_path = self.op.src_path
5967 if src_node is None:
5968 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5969 exp_list = self.rpc.call_export_list(locked_nodes)
5971 for node in exp_list:
5972 if exp_list[node].fail_msg:
5974 if src_path in exp_list[node].payload:
5976 self.op.src_node = src_node = node
5977 self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5981 raise errors.OpPrereqError("No export found for relative path %s" %
5982 src_path, errors.ECODE_INVAL)
5984 _CheckNodeOnline(self, src_node)
5985 result = self.rpc.call_export_info(src_node, src_path)
5986 result.Raise("No export or invalid export found in dir %s" % src_path)
5988 export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5989 if not export_info.has_section(constants.INISECT_EXP):
5990 raise errors.ProgrammerError("Corrupted export config",
5991 errors.ECODE_ENVIRON)
5993 ei_version = export_info.get(constants.INISECT_EXP, 'version')
5994 if (int(ei_version) != constants.EXPORT_VERSION):
5995 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5996 (ei_version, constants.EXPORT_VERSION),
5997 errors.ECODE_ENVIRON)
5999 # Check that the new instance doesn't have less disks than the export
6000 instance_disks = len(self.disks)
6001 export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
6002 if instance_disks < export_disks:
6003 raise errors.OpPrereqError("Not enough disks to import."
6004 " (instance: %d, export: %d)" %
6005 (instance_disks, export_disks),
6008 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
6010 for idx in range(export_disks):
6011 option = 'disk%d_dump' % idx
6012 if export_info.has_option(constants.INISECT_INS, option):
6013 # FIXME: are the old os-es, disk sizes, etc. useful?
6014 export_name = export_info.get(constants.INISECT_INS, option)
6015 image = os.path.join(src_path, export_name)
6016 disk_images.append(image)
6018 disk_images.append(False)
6020 self.src_images = disk_images
6022 old_name = export_info.get(constants.INISECT_INS, 'name')
6023 # FIXME: int() here could throw a ValueError on broken exports
6024 exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
6025 if self.op.instance_name == old_name:
6026 for idx, nic in enumerate(self.nics):
6027 if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
6028 nic_mac_ini = 'nic%d_mac' % idx
6029 nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
6031 # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
6033 # ip ping checks (we use the same ip that was resolved in ExpandNames)
6034 if self.op.ip_check:
6035 if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
6036 raise errors.OpPrereqError("IP %s of instance %s already in use" %
6037 (self.check_ip, self.op.instance_name),
6038 errors.ECODE_NOTUNIQUE)
6040 #### mac address generation
6041 # By generating here the mac address both the allocator and the hooks get
6042 # the real final mac address rather than the 'auto' or 'generate' value.
6043 # There is a race condition between the generation and the instance object
6044 # creation, which means that we know the mac is valid now, but we're not
6045 # sure it will be when we actually add the instance. If things go bad
6046 # adding the instance will abort because of a duplicate mac, and the
6047 # creation job will fail.
6048 for nic in self.nics:
6049 if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6050 nic.mac = self.cfg.GenerateMAC(self.proc.GetECId())
6054 if self.op.iallocator is not None:
6055 self._RunAllocator()
6057 #### node related checks
6059 # check primary node
6060 self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
6061 assert self.pnode is not None, \
6062 "Cannot retrieve locked node %s" % self.op.pnode
6064 raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
6065 pnode.name, errors.ECODE_STATE)
6067 raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
6068 pnode.name, errors.ECODE_STATE)
6070 self.secondaries = []
6072 # mirror node verification
6073 if self.op.disk_template in constants.DTS_NET_MIRROR:
6074 if self.op.snode is None:
6075 raise errors.OpPrereqError("The networked disk templates need"
6076 " a mirror node", errors.ECODE_INVAL)
6077 if self.op.snode == pnode.name:
6078 raise errors.OpPrereqError("The secondary node cannot be the"
6079 " primary node.", errors.ECODE_INVAL)
6080 _CheckNodeOnline(self, self.op.snode)
6081 _CheckNodeNotDrained(self, self.op.snode)
6082 self.secondaries.append(self.op.snode)
6084 nodenames = [pnode.name] + self.secondaries
6086 req_size = _ComputeDiskSize(self.op.disk_template,
6089 # Check lv size requirements
6090 if req_size is not None:
6091 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6093 for node in nodenames:
6094 info = nodeinfo[node]
6095 info.Raise("Cannot get current information from node %s" % node)
6097 vg_free = info.get('vg_free', None)
6098 if not isinstance(vg_free, int):
6099 raise errors.OpPrereqError("Can't compute free disk space on"
6100 " node %s" % node, errors.ECODE_ENVIRON)
6101 if req_size > vg_free:
6102 raise errors.OpPrereqError("Not enough disk space on target node %s."
6103 " %d MB available, %d MB required" %
6104 (node, vg_free, req_size),
6107 _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
6110 result = self.rpc.call_os_get(pnode.name, self.op.os_type)
6111 result.Raise("OS '%s' not in supported os list for primary node %s" %
6112 (self.op.os_type, pnode.name),
6113 prereq=True, ecode=errors.ECODE_INVAL)
6114 if not self.op.force_variant:
6115 _CheckOSVariant(result.payload, self.op.os_type)
6117 _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
6119 # memory check on primary node
6121 _CheckNodeFreeMemory(self, self.pnode.name,
6122 "creating instance %s" % self.op.instance_name,
6123 self.be_full[constants.BE_MEMORY],
6126 self.dry_run_result = list(nodenames)
6128 def Exec(self, feedback_fn):
6129 """Create and add the instance to the cluster.
6132 instance = self.op.instance_name
6133 pnode_name = self.pnode.name
6135 ht_kind = self.op.hypervisor
6136 if ht_kind in constants.HTS_REQ_PORT:
6137 network_port = self.cfg.AllocatePort()
6141 ##if self.op.vnc_bind_address is None:
6142 ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
6144 # this is needed because os.path.join does not accept None arguments
6145 if self.op.file_storage_dir is None:
6146 string_file_storage_dir = ""
6148 string_file_storage_dir = self.op.file_storage_dir
6150 # build the full file storage dir path
6151 file_storage_dir = os.path.normpath(os.path.join(
6152 self.cfg.GetFileStorageDir(),
6153 string_file_storage_dir, instance))
6156 disks = _GenerateDiskTemplate(self,
6157 self.op.disk_template,
6158 instance, pnode_name,
6162 self.op.file_driver,
6165 iobj = objects.Instance(name=instance, os=self.op.os_type,
6166 primary_node=pnode_name,
6167 nics=self.nics, disks=disks,
6168 disk_template=self.op.disk_template,
6170 network_port=network_port,
6171 beparams=self.op.beparams,
6172 hvparams=self.op.hvparams,
6173 hypervisor=self.op.hypervisor,
6176 feedback_fn("* creating instance disks...")
6178 _CreateDisks(self, iobj)
6179 except errors.OpExecError:
6180 self.LogWarning("Device creation failed, reverting...")
6182 _RemoveDisks(self, iobj)
6184 self.cfg.ReleaseDRBDMinors(instance)
6187 feedback_fn("adding instance %s to cluster config" % instance)
6189 self.cfg.AddInstance(iobj, self.proc.GetECId())
6191 # Declare that we don't want to remove the instance lock anymore, as we've
6192 # added the instance to the config
6193 del self.remove_locks[locking.LEVEL_INSTANCE]
6194 # Unlock all the nodes
6195 if self.op.mode == constants.INSTANCE_IMPORT:
6196 nodes_keep = [self.op.src_node]
6197 nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
6198 if node != self.op.src_node]
6199 self.context.glm.release(locking.LEVEL_NODE, nodes_release)
6200 self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
6202 self.context.glm.release(locking.LEVEL_NODE)
6203 del self.acquired_locks[locking.LEVEL_NODE]
6205 if self.op.wait_for_sync:
6206 disk_abort = not _WaitForSync(self, iobj)
6207 elif iobj.disk_template in constants.DTS_NET_MIRROR:
6208 # make sure the disks are not degraded (still sync-ing is ok)
6210 feedback_fn("* checking mirrors status")
6211 disk_abort = not _WaitForSync(self, iobj, oneshot=True)
6216 _RemoveDisks(self, iobj)
6217 self.cfg.RemoveInstance(iobj.name)
6218 # Make sure the instance lock gets removed
6219 self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
6220 raise errors.OpExecError("There are some degraded disks for"
6223 feedback_fn("creating os for instance %s on node %s" %
6224 (instance, pnode_name))
6226 if iobj.disk_template != constants.DT_DISKLESS:
6227 if self.op.mode == constants.INSTANCE_CREATE:
6228 feedback_fn("* running the instance OS create scripts...")
6229 # FIXME: pass debug option from opcode to backend
6230 result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
6231 self.op.debug_level)
6232 result.Raise("Could not add os for instance %s"
6233 " on node %s" % (instance, pnode_name))
6235 elif self.op.mode == constants.INSTANCE_IMPORT:
6236 feedback_fn("* running the instance OS import scripts...")
6237 src_node = self.op.src_node
6238 src_images = self.src_images
6239 cluster_name = self.cfg.GetClusterName()
6240 # FIXME: pass debug option from opcode to backend
6241 import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
6242 src_node, src_images,
6244 self.op.debug_level)
6245 msg = import_result.fail_msg
6247 self.LogWarning("Error while importing the disk images for instance"
6248 " %s on node %s: %s" % (instance, pnode_name, msg))
6250 # also checked in the prereq part
6251 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
6255 iobj.admin_up = True
6256 self.cfg.Update(iobj, feedback_fn)
6257 logging.info("Starting instance %s on node %s", instance, pnode_name)
6258 feedback_fn("* starting instance...")
6259 result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
6260 result.Raise("Could not start instance")
6262 return list(iobj.all_nodes)
6265 class LUConnectConsole(NoHooksLU):
6266 """Connect to an instance's console.
6268 This is somewhat special in that it returns the command line that
6269 you need to run on the master node in order to connect to the
6273 _OP_REQP = ["instance_name"]
6276 def ExpandNames(self):
6277 self._ExpandAndLockInstance()
6279 def CheckPrereq(self):
6280 """Check prerequisites.
6282 This checks that the instance is in the cluster.
6285 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6286 assert self.instance is not None, \
6287 "Cannot retrieve locked instance %s" % self.op.instance_name
6288 _CheckNodeOnline(self, self.instance.primary_node)
6290 def Exec(self, feedback_fn):
6291 """Connect to the console of an instance
6294 instance = self.instance
6295 node = instance.primary_node
6297 node_insts = self.rpc.call_instance_list([node],
6298 [instance.hypervisor])[node]
6299 node_insts.Raise("Can't get node information from %s" % node)
6301 if instance.name not in node_insts.payload:
6302 raise errors.OpExecError("Instance %s is not running." % instance.name)
6304 logging.debug("Connecting to console of %s on %s", instance.name, node)
6306 hyper = hypervisor.GetHypervisor(instance.hypervisor)
6307 cluster = self.cfg.GetClusterInfo()
6308 # beparams and hvparams are passed separately, to avoid editing the
6309 # instance and then saving the defaults in the instance itself.
6310 hvparams = cluster.FillHV(instance)
6311 beparams = cluster.FillBE(instance)
6312 console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
6315 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
6318 class LUReplaceDisks(LogicalUnit):
6319 """Replace the disks of an instance.
6322 HPATH = "mirrors-replace"
6323 HTYPE = constants.HTYPE_INSTANCE
6324 _OP_REQP = ["instance_name", "mode", "disks"]
6327 def CheckArguments(self):
6328 if not hasattr(self.op, "remote_node"):
6329 self.op.remote_node = None
6330 if not hasattr(self.op, "iallocator"):
6331 self.op.iallocator = None
6332 if not hasattr(self.op, "early_release"):
6333 self.op.early_release = False
6335 TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
6338 def ExpandNames(self):
6339 self._ExpandAndLockInstance()
6341 if self.op.iallocator is not None:
6342 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6344 elif self.op.remote_node is not None:
6345 remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
6346 self.op.remote_node = remote_node
6348 # Warning: do not remove the locking of the new secondary here
6349 # unless DRBD8.AddChildren is changed to work in parallel;
6350 # currently it doesn't since parallel invocations of
6351 # FindUnusedMinor will conflict
6352 self.needed_locks[locking.LEVEL_NODE] = [remote_node]
6353 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6356 self.needed_locks[locking.LEVEL_NODE] = []
6357 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6359 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
6360 self.op.iallocator, self.op.remote_node,
6361 self.op.disks, False, self.op.early_release)
6363 self.tasklets = [self.replacer]
6365 def DeclareLocks(self, level):
6366 # If we're not already locking all nodes in the set we have to declare the
6367 # instance's primary/secondary nodes.
6368 if (level == locking.LEVEL_NODE and
6369 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6370 self._LockInstancesNodes()
6372 def BuildHooksEnv(self):
6375 This runs on the master, the primary and all the secondaries.
6378 instance = self.replacer.instance
6380 "MODE": self.op.mode,
6381 "NEW_SECONDARY": self.op.remote_node,
6382 "OLD_SECONDARY": instance.secondary_nodes[0],
6384 env.update(_BuildInstanceHookEnvByObject(self, instance))
6386 self.cfg.GetMasterNode(),
6387 instance.primary_node,
6389 if self.op.remote_node is not None:
6390 nl.append(self.op.remote_node)
6394 class LUEvacuateNode(LogicalUnit):
6395 """Relocate the secondary instances from a node.
6398 HPATH = "node-evacuate"
6399 HTYPE = constants.HTYPE_NODE
6400 _OP_REQP = ["node_name"]
6403 def CheckArguments(self):
6404 if not hasattr(self.op, "remote_node"):
6405 self.op.remote_node = None
6406 if not hasattr(self.op, "iallocator"):
6407 self.op.iallocator = None
6408 if not hasattr(self.op, "early_release"):
6409 self.op.early_release = False
6411 TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
6412 self.op.remote_node,
6415 def ExpandNames(self):
6416 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
6418 self.needed_locks = {}
6420 # Declare node locks
6421 if self.op.iallocator is not None:
6422 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6424 elif self.op.remote_node is not None:
6425 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
6427 # Warning: do not remove the locking of the new secondary here
6428 # unless DRBD8.AddChildren is changed to work in parallel;
6429 # currently it doesn't since parallel invocations of
6430 # FindUnusedMinor will conflict
6431 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
6432 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
6435 raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL)
6437 # Create tasklets for replacing disks for all secondary instances on this
6442 for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
6443 logging.debug("Replacing disks for instance %s", inst.name)
6444 names.append(inst.name)
6446 replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
6447 self.op.iallocator, self.op.remote_node, [],
6448 True, self.op.early_release)
6449 tasklets.append(replacer)
6451 self.tasklets = tasklets
6452 self.instance_names = names
6454 # Declare instance locks
6455 self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
6457 def DeclareLocks(self, level):
6458 # If we're not already locking all nodes in the set we have to declare the
6459 # instance's primary/secondary nodes.
6460 if (level == locking.LEVEL_NODE and
6461 self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
6462 self._LockInstancesNodes()
6464 def BuildHooksEnv(self):
6467 This runs on the master, the primary and all the secondaries.
6471 "NODE_NAME": self.op.node_name,
6474 nl = [self.cfg.GetMasterNode()]
6476 if self.op.remote_node is not None:
6477 env["NEW_SECONDARY"] = self.op.remote_node
6478 nl.append(self.op.remote_node)
6480 return (env, nl, nl)
6483 class TLReplaceDisks(Tasklet):
6484 """Replaces disks for an instance.
6486 Note: Locking is not within the scope of this class.
6489 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
6490 disks, delay_iallocator, early_release):
6491 """Initializes this class.
6494 Tasklet.__init__(self, lu)
6497 self.instance_name = instance_name
6499 self.iallocator_name = iallocator_name
6500 self.remote_node = remote_node
6502 self.delay_iallocator = delay_iallocator
6503 self.early_release = early_release
6506 self.instance = None
6507 self.new_node = None
6508 self.target_node = None
6509 self.other_node = None
6510 self.remote_node_info = None
6511 self.node_secondary_ip = None
6514 def CheckArguments(mode, remote_node, iallocator):
6515 """Helper function for users of this class.
6518 # check for valid parameter combination
6519 if mode == constants.REPLACE_DISK_CHG:
6520 if remote_node is None and iallocator is None:
6521 raise errors.OpPrereqError("When changing the secondary either an"
6522 " iallocator script must be used or the"
6523 " new node given", errors.ECODE_INVAL)
6525 if remote_node is not None and iallocator is not None:
6526 raise errors.OpPrereqError("Give either the iallocator or the new"
6527 " secondary, not both", errors.ECODE_INVAL)
6529 elif remote_node is not None or iallocator is not None:
6530 # Not replacing the secondary
6531 raise errors.OpPrereqError("The iallocator and new node options can"
6532 " only be used when changing the"
6533 " secondary node", errors.ECODE_INVAL)
6536 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
6537 """Compute a new secondary node using an IAllocator.
6540 ial = IAllocator(lu.cfg, lu.rpc,
6541 mode=constants.IALLOCATOR_MODE_RELOC,
6543 relocate_from=relocate_from)
6545 ial.Run(iallocator_name)
6548 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
6549 " %s" % (iallocator_name, ial.info),
6552 if len(ial.result) != ial.required_nodes:
6553 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
6554 " of nodes (%s), required %s" %
6556 len(ial.result), ial.required_nodes),
6559 remote_node_name = ial.result[0]
6561 lu.LogInfo("Selected new secondary for instance '%s': %s",
6562 instance_name, remote_node_name)
6564 return remote_node_name
6566 def _FindFaultyDisks(self, node_name):
6567 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
6570 def CheckPrereq(self):
6571 """Check prerequisites.
6573 This checks that the instance is in the cluster.
6576 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
6577 assert instance is not None, \
6578 "Cannot retrieve locked instance %s" % self.instance_name
6580 if instance.disk_template != constants.DT_DRBD8:
6581 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
6582 " instances", errors.ECODE_INVAL)
6584 if len(instance.secondary_nodes) != 1:
6585 raise errors.OpPrereqError("The instance has a strange layout,"
6586 " expected one secondary but found %d" %
6587 len(instance.secondary_nodes),
6590 if not self.delay_iallocator:
6591 self._CheckPrereq2()
6593 def _CheckPrereq2(self):
6594 """Check prerequisites, second part.
6596 This function should always be part of CheckPrereq. It was separated and is
6597 now called from Exec because during node evacuation iallocator was only
6598 called with an unmodified cluster model, not taking planned changes into
6602 instance = self.instance
6603 secondary_node = instance.secondary_nodes[0]
6605 if self.iallocator_name is None:
6606 remote_node = self.remote_node
6608 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
6609 instance.name, instance.secondary_nodes)
6611 if remote_node is not None:
6612 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
6613 assert self.remote_node_info is not None, \
6614 "Cannot retrieve locked node %s" % remote_node
6616 self.remote_node_info = None
6618 if remote_node == self.instance.primary_node:
6619 raise errors.OpPrereqError("The specified node is the primary node of"
6620 " the instance.", errors.ECODE_INVAL)
6622 if remote_node == secondary_node:
6623 raise errors.OpPrereqError("The specified node is already the"
6624 " secondary node of the instance.",
6627 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
6628 constants.REPLACE_DISK_CHG):
6629 raise errors.OpPrereqError("Cannot specify disks to be replaced",
6632 if self.mode == constants.REPLACE_DISK_AUTO:
6633 faulty_primary = self._FindFaultyDisks(instance.primary_node)
6634 faulty_secondary = self._FindFaultyDisks(secondary_node)
6636 if faulty_primary and faulty_secondary:
6637 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
6638 " one node and can not be repaired"
6639 " automatically" % self.instance_name,
6643 self.disks = faulty_primary
6644 self.target_node = instance.primary_node
6645 self.other_node = secondary_node
6646 check_nodes = [self.target_node, self.other_node]
6647 elif faulty_secondary:
6648 self.disks = faulty_secondary
6649 self.target_node = secondary_node
6650 self.other_node = instance.primary_node
6651 check_nodes = [self.target_node, self.other_node]
6657 # Non-automatic modes
6658 if self.mode == constants.REPLACE_DISK_PRI:
6659 self.target_node = instance.primary_node
6660 self.other_node = secondary_node
6661 check_nodes = [self.target_node, self.other_node]
6663 elif self.mode == constants.REPLACE_DISK_SEC:
6664 self.target_node = secondary_node
6665 self.other_node = instance.primary_node
6666 check_nodes = [self.target_node, self.other_node]
6668 elif self.mode == constants.REPLACE_DISK_CHG:
6669 self.new_node = remote_node
6670 self.other_node = instance.primary_node
6671 self.target_node = secondary_node
6672 check_nodes = [self.new_node, self.other_node]
6674 _CheckNodeNotDrained(self.lu, remote_node)
6676 old_node_info = self.cfg.GetNodeInfo(secondary_node)
6677 assert old_node_info is not None
6678 if old_node_info.offline and not self.early_release:
6679 # doesn't make sense to delay the release
6680 self.early_release = True
6681 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
6682 " early-release mode", secondary_node)
6685 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
6688 # If not specified all disks should be replaced
6690 self.disks = range(len(self.instance.disks))
6692 for node in check_nodes:
6693 _CheckNodeOnline(self.lu, node)
6695 # Check whether disks are valid
6696 for disk_idx in self.disks:
6697 instance.FindDisk(disk_idx)
6699 # Get secondary node IP addresses
6702 for node_name in [self.target_node, self.other_node, self.new_node]:
6703 if node_name is not None:
6704 node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
6706 self.node_secondary_ip = node_2nd_ip
6708 def Exec(self, feedback_fn):
6709 """Execute disk replacement.
6711 This dispatches the disk replacement to the appropriate handler.
6714 if self.delay_iallocator:
6715 self._CheckPrereq2()
6718 feedback_fn("No disks need replacement")
6721 feedback_fn("Replacing disk(s) %s for %s" %
6722 (utils.CommaJoin(self.disks), self.instance.name))
6724 activate_disks = (not self.instance.admin_up)
6726 # Activate the instance disks if we're replacing them on a down instance
6728 _StartInstanceDisks(self.lu, self.instance, True)
6731 # Should we replace the secondary node?
6732 if self.new_node is not None:
6733 fn = self._ExecDrbd8Secondary
6735 fn = self._ExecDrbd8DiskOnly
6737 return fn(feedback_fn)
6740 # Deactivate the instance disks if we're replacing them on a
6743 _SafeShutdownInstanceDisks(self.lu, self.instance)
6745 def _CheckVolumeGroup(self, nodes):
6746 self.lu.LogInfo("Checking volume groups")
6748 vgname = self.cfg.GetVGName()
6750 # Make sure volume group exists on all involved nodes
6751 results = self.rpc.call_vg_list(nodes)
6753 raise errors.OpExecError("Can't list volume groups on the nodes")
6757 res.Raise("Error checking node %s" % node)
6758 if vgname not in res.payload:
6759 raise errors.OpExecError("Volume group '%s' not found on node %s" %
6762 def _CheckDisksExistence(self, nodes):
6763 # Check disk existence
6764 for idx, dev in enumerate(self.instance.disks):
6765 if idx not in self.disks:
6769 self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
6770 self.cfg.SetDiskID(dev, node)
6772 result = self.rpc.call_blockdev_find(node, dev)
6774 msg = result.fail_msg
6775 if msg or not result.payload:
6777 msg = "disk not found"
6778 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
6781 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
6782 for idx, dev in enumerate(self.instance.disks):
6783 if idx not in self.disks:
6786 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
6789 if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
6791 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
6792 " replace disks for instance %s" %
6793 (node_name, self.instance.name))
6795 def _CreateNewStorage(self, node_name):
6796 vgname = self.cfg.GetVGName()
6799 for idx, dev in enumerate(self.instance.disks):
6800 if idx not in self.disks:
6803 self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
6805 self.cfg.SetDiskID(dev, node_name)
6807 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
6808 names = _GenerateUniqueNames(self.lu, lv_names)
6810 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
6811 logical_id=(vgname, names[0]))
6812 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
6813 logical_id=(vgname, names[1]))
6815 new_lvs = [lv_data, lv_meta]
6816 old_lvs = dev.children
6817 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
6819 # we pass force_create=True to force the LVM creation
6820 for new_lv in new_lvs:
6821 _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
6822 _GetInstanceInfoText(self.instance), False)
6826 def _CheckDevices(self, node_name, iv_names):
6827 for name, (dev, _, _) in iv_names.iteritems():
6828 self.cfg.SetDiskID(dev, node_name)
6830 result = self.rpc.call_blockdev_find(node_name, dev)
6832 msg = result.fail_msg
6833 if msg or not result.payload:
6835 msg = "disk not found"
6836 raise errors.OpExecError("Can't find DRBD device %s: %s" %
6839 if result.payload.is_degraded:
6840 raise errors.OpExecError("DRBD device %s is degraded!" % name)
6842 def _RemoveOldStorage(self, node_name, iv_names):
6843 for name, (_, old_lvs, _) in iv_names.iteritems():
6844 self.lu.LogInfo("Remove logical volumes for %s" % name)
6847 self.cfg.SetDiskID(lv, node_name)
6849 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
6851 self.lu.LogWarning("Can't remove old LV: %s" % msg,
6852 hint="remove unused LVs manually")
6854 def _ReleaseNodeLock(self, node_name):
6855 """Releases the lock for a given node."""
6856 self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
6858 def _ExecDrbd8DiskOnly(self, feedback_fn):
6859 """Replace a disk on the primary or secondary for DRBD 8.
6861 The algorithm for replace is quite complicated:
6863 1. for each disk to be replaced:
6865 1. create new LVs on the target node with unique names
6866 1. detach old LVs from the drbd device
6867 1. rename old LVs to name_replaced.<time_t>
6868 1. rename new LVs to old LVs
6869 1. attach the new LVs (with the old names now) to the drbd device
6871 1. wait for sync across all devices
6873 1. for each modified disk:
6875 1. remove old LVs (which have the name name_replaces.<time_t>)
6877 Failures are not very well handled.
6882 # Step: check device activation
6883 self.lu.LogStep(1, steps_total, "Check device existence")
6884 self._CheckDisksExistence([self.other_node, self.target_node])
6885 self._CheckVolumeGroup([self.target_node, self.other_node])
6887 # Step: check other node consistency
6888 self.lu.LogStep(2, steps_total, "Check peer consistency")
6889 self._CheckDisksConsistency(self.other_node,
6890 self.other_node == self.instance.primary_node,
6893 # Step: create new storage
6894 self.lu.LogStep(3, steps_total, "Allocate new storage")
6895 iv_names = self._CreateNewStorage(self.target_node)
6897 # Step: for each lv, detach+rename*2+attach
6898 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6899 for dev, old_lvs, new_lvs in iv_names.itervalues():
6900 self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
6902 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
6904 result.Raise("Can't detach drbd from local storage on node"
6905 " %s for device %s" % (self.target_node, dev.iv_name))
6907 #cfg.Update(instance)
6909 # ok, we created the new LVs, so now we know we have the needed
6910 # storage; as such, we proceed on the target node to rename
6911 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
6912 # using the assumption that logical_id == physical_id (which in
6913 # turn is the unique_id on that node)
6915 # FIXME(iustin): use a better name for the replaced LVs
6916 temp_suffix = int(time.time())
6917 ren_fn = lambda d, suff: (d.physical_id[0],
6918 d.physical_id[1] + "_replaced-%s" % suff)
6920 # Build the rename list based on what LVs exist on the node
6921 rename_old_to_new = []
6922 for to_ren in old_lvs:
6923 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
6924 if not result.fail_msg and result.payload:
6926 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
6928 self.lu.LogInfo("Renaming the old LVs on the target node")
6929 result = self.rpc.call_blockdev_rename(self.target_node,
6931 result.Raise("Can't rename old LVs on node %s" % self.target_node)
6933 # Now we rename the new LVs to the old LVs
6934 self.lu.LogInfo("Renaming the new LVs on the target node")
6935 rename_new_to_old = [(new, old.physical_id)
6936 for old, new in zip(old_lvs, new_lvs)]
6937 result = self.rpc.call_blockdev_rename(self.target_node,
6939 result.Raise("Can't rename new LVs on node %s" % self.target_node)
6941 for old, new in zip(old_lvs, new_lvs):
6942 new.logical_id = old.logical_id
6943 self.cfg.SetDiskID(new, self.target_node)
6945 for disk in old_lvs:
6946 disk.logical_id = ren_fn(disk, temp_suffix)
6947 self.cfg.SetDiskID(disk, self.target_node)
6949 # Now that the new lvs have the old name, we can add them to the device
6950 self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
6951 result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
6953 msg = result.fail_msg
6955 for new_lv in new_lvs:
6956 msg2 = self.rpc.call_blockdev_remove(self.target_node,
6959 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6960 hint=("cleanup manually the unused logical"
6962 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6964 dev.children = new_lvs
6966 self.cfg.Update(self.instance, feedback_fn)
6969 if self.early_release:
6970 self.lu.LogStep(cstep, steps_total, "Removing old storage")
6972 self._RemoveOldStorage(self.target_node, iv_names)
6973 # WARNING: we release both node locks here, do not do other RPCs
6974 # than WaitForSync to the primary node
6975 self._ReleaseNodeLock([self.target_node, self.other_node])
6978 # This can fail as the old devices are degraded and _WaitForSync
6979 # does a combined result over all disks, so we don't check its return value
6980 self.lu.LogStep(cstep, steps_total, "Sync devices")
6982 _WaitForSync(self.lu, self.instance)
6984 # Check all devices manually
6985 self._CheckDevices(self.instance.primary_node, iv_names)
6987 # Step: remove old storage
6988 if not self.early_release:
6989 self.lu.LogStep(cstep, steps_total, "Removing old storage")
6991 self._RemoveOldStorage(self.target_node, iv_names)
6993 def _ExecDrbd8Secondary(self, feedback_fn):
6994 """Replace the secondary node for DRBD 8.
6996 The algorithm for replace is quite complicated:
6997 - for all disks of the instance:
6998 - create new LVs on the new node with same names
6999 - shutdown the drbd device on the old secondary
7000 - disconnect the drbd network on the primary
7001 - create the drbd device on the new secondary
7002 - network attach the drbd on the primary, using an artifice:
7003 the drbd code for Attach() will connect to the network if it
7004 finds a device which is connected to the good local disks but
7006 - wait for sync across all devices
7007 - remove all disks from the old secondary
7009 Failures are not very well handled.
7014 # Step: check device activation
7015 self.lu.LogStep(1, steps_total, "Check device existence")
7016 self._CheckDisksExistence([self.instance.primary_node])
7017 self._CheckVolumeGroup([self.instance.primary_node])
7019 # Step: check other node consistency
7020 self.lu.LogStep(2, steps_total, "Check peer consistency")
7021 self._CheckDisksConsistency(self.instance.primary_node, True, True)
7023 # Step: create new storage
7024 self.lu.LogStep(3, steps_total, "Allocate new storage")
7025 for idx, dev in enumerate(self.instance.disks):
7026 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
7027 (self.new_node, idx))
7028 # we pass force_create=True to force LVM creation
7029 for new_lv in dev.children:
7030 _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
7031 _GetInstanceInfoText(self.instance), False)
7033 # Step 4: dbrd minors and drbd setups changes
7034 # after this, we must manually remove the drbd minors on both the
7035 # error and the success paths
7036 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
7037 minors = self.cfg.AllocateDRBDMinor([self.new_node
7038 for dev in self.instance.disks],
7040 logging.debug("Allocated minors %r", minors)
7043 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
7044 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
7045 (self.new_node, idx))
7046 # create new devices on new_node; note that we create two IDs:
7047 # one without port, so the drbd will be activated without
7048 # networking information on the new node at this stage, and one
7049 # with network, for the latter activation in step 4
7050 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
7051 if self.instance.primary_node == o_node1:
7054 assert self.instance.primary_node == o_node2, "Three-node instance?"
7057 new_alone_id = (self.instance.primary_node, self.new_node, None,
7058 p_minor, new_minor, o_secret)
7059 new_net_id = (self.instance.primary_node, self.new_node, o_port,
7060 p_minor, new_minor, o_secret)
7062 iv_names[idx] = (dev, dev.children, new_net_id)
7063 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
7065 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
7066 logical_id=new_alone_id,
7067 children=dev.children,
7070 _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
7071 _GetInstanceInfoText(self.instance), False)
7072 except errors.GenericError:
7073 self.cfg.ReleaseDRBDMinors(self.instance.name)
7076 # We have new devices, shutdown the drbd on the old secondary
7077 for idx, dev in enumerate(self.instance.disks):
7078 self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
7079 self.cfg.SetDiskID(dev, self.target_node)
7080 msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
7082 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
7083 "node: %s" % (idx, msg),
7084 hint=("Please cleanup this device manually as"
7085 " soon as possible"))
7087 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
7088 result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node],
7089 self.node_secondary_ip,
7090 self.instance.disks)\
7091 [self.instance.primary_node]
7093 msg = result.fail_msg
7095 # detaches didn't succeed (unlikely)
7096 self.cfg.ReleaseDRBDMinors(self.instance.name)
7097 raise errors.OpExecError("Can't detach the disks from the network on"
7098 " old node: %s" % (msg,))
7100 # if we managed to detach at least one, we update all the disks of
7101 # the instance to point to the new secondary
7102 self.lu.LogInfo("Updating instance configuration")
7103 for dev, _, new_logical_id in iv_names.itervalues():
7104 dev.logical_id = new_logical_id
7105 self.cfg.SetDiskID(dev, self.instance.primary_node)
7107 self.cfg.Update(self.instance, feedback_fn)
7109 # and now perform the drbd attach
7110 self.lu.LogInfo("Attaching primary drbds to new secondary"
7111 " (standalone => connected)")
7112 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
7114 self.node_secondary_ip,
7115 self.instance.disks,
7118 for to_node, to_result in result.items():
7119 msg = to_result.fail_msg
7121 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
7123 hint=("please do a gnt-instance info to see the"
7124 " status of disks"))
7126 if self.early_release:
7127 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7129 self._RemoveOldStorage(self.target_node, iv_names)
7130 # WARNING: we release all node locks here, do not do other RPCs
7131 # than WaitForSync to the primary node
7132 self._ReleaseNodeLock([self.instance.primary_node,
7137 # This can fail as the old devices are degraded and _WaitForSync
7138 # does a combined result over all disks, so we don't check its return value
7139 self.lu.LogStep(cstep, steps_total, "Sync devices")
7141 _WaitForSync(self.lu, self.instance)
7143 # Check all devices manually
7144 self._CheckDevices(self.instance.primary_node, iv_names)
7146 # Step: remove old storage
7147 if not self.early_release:
7148 self.lu.LogStep(cstep, steps_total, "Removing old storage")
7149 self._RemoveOldStorage(self.target_node, iv_names)
7152 class LURepairNodeStorage(NoHooksLU):
7153 """Repairs the volume group on a node.
7156 _OP_REQP = ["node_name"]
7159 def CheckArguments(self):
7160 self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
7162 def ExpandNames(self):
7163 self.needed_locks = {
7164 locking.LEVEL_NODE: [self.op.node_name],
7167 def _CheckFaultyDisks(self, instance, node_name):
7168 """Ensure faulty disks abort the opcode or at least warn."""
7170 if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
7172 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
7173 " node '%s'" % (instance.name, node_name),
7175 except errors.OpPrereqError, err:
7176 if self.op.ignore_consistency:
7177 self.proc.LogWarning(str(err.args[0]))
7181 def CheckPrereq(self):
7182 """Check prerequisites.
7185 storage_type = self.op.storage_type
7187 if (constants.SO_FIX_CONSISTENCY not in
7188 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
7189 raise errors.OpPrereqError("Storage units of type '%s' can not be"
7190 " repaired" % storage_type,
7193 # Check whether any instance on this node has faulty disks
7194 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
7195 if not inst.admin_up:
7197 check_nodes = set(inst.all_nodes)
7198 check_nodes.discard(self.op.node_name)
7199 for inst_node_name in check_nodes:
7200 self._CheckFaultyDisks(inst, inst_node_name)
7202 def Exec(self, feedback_fn):
7203 feedback_fn("Repairing storage unit '%s' on %s ..." %
7204 (self.op.name, self.op.node_name))
7206 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
7207 result = self.rpc.call_storage_execute(self.op.node_name,
7208 self.op.storage_type, st_args,
7210 constants.SO_FIX_CONSISTENCY)
7211 result.Raise("Failed to repair storage unit '%s' on %s" %
7212 (self.op.name, self.op.node_name))
7215 class LUGrowDisk(LogicalUnit):
7216 """Grow a disk of an instance.
7220 HTYPE = constants.HTYPE_INSTANCE
7221 _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
7224 def ExpandNames(self):
7225 self._ExpandAndLockInstance()
7226 self.needed_locks[locking.LEVEL_NODE] = []
7227 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7229 def DeclareLocks(self, level):
7230 if level == locking.LEVEL_NODE:
7231 self._LockInstancesNodes()
7233 def BuildHooksEnv(self):
7236 This runs on the master, the primary and all the secondaries.
7240 "DISK": self.op.disk,
7241 "AMOUNT": self.op.amount,
7243 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
7244 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7247 def CheckPrereq(self):
7248 """Check prerequisites.
7250 This checks that the instance is in the cluster.
7253 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7254 assert instance is not None, \
7255 "Cannot retrieve locked instance %s" % self.op.instance_name
7256 nodenames = list(instance.all_nodes)
7257 for node in nodenames:
7258 _CheckNodeOnline(self, node)
7261 self.instance = instance
7263 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
7264 raise errors.OpPrereqError("Instance's disk layout does not support"
7265 " growing.", errors.ECODE_INVAL)
7267 self.disk = instance.FindDisk(self.op.disk)
7269 nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
7270 instance.hypervisor)
7271 for node in nodenames:
7272 info = nodeinfo[node]
7273 info.Raise("Cannot get current information from node %s" % node)
7274 vg_free = info.payload.get('vg_free', None)
7275 if not isinstance(vg_free, int):
7276 raise errors.OpPrereqError("Can't compute free disk space on"
7277 " node %s" % node, errors.ECODE_ENVIRON)
7278 if self.op.amount > vg_free:
7279 raise errors.OpPrereqError("Not enough disk space on target node %s:"
7280 " %d MiB available, %d MiB required" %
7281 (node, vg_free, self.op.amount),
7284 def Exec(self, feedback_fn):
7285 """Execute disk grow.
7288 instance = self.instance
7290 for node in instance.all_nodes:
7291 self.cfg.SetDiskID(disk, node)
7292 result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
7293 result.Raise("Grow request failed to node %s" % node)
7295 # TODO: Rewrite code to work properly
7296 # DRBD goes into sync mode for a short amount of time after executing the
7297 # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
7298 # calling "resize" in sync mode fails. Sleeping for a short amount of
7299 # time is a work-around.
7302 disk.RecordGrow(self.op.amount)
7303 self.cfg.Update(instance, feedback_fn)
7304 if self.op.wait_for_sync:
7305 disk_abort = not _WaitForSync(self, instance)
7307 self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
7308 " status.\nPlease check the instance.")
7311 class LUQueryInstanceData(NoHooksLU):
7312 """Query runtime instance data.
7315 _OP_REQP = ["instances", "static"]
7318 def ExpandNames(self):
7319 self.needed_locks = {}
7320 self.share_locks = dict.fromkeys(locking.LEVELS, 1)
7322 if not isinstance(self.op.instances, list):
7323 raise errors.OpPrereqError("Invalid argument type 'instances'",
7326 if self.op.instances:
7327 self.wanted_names = []
7328 for name in self.op.instances:
7329 full_name = _ExpandInstanceName(self.cfg, name)
7330 self.wanted_names.append(full_name)
7331 self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
7333 self.wanted_names = None
7334 self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
7336 self.needed_locks[locking.LEVEL_NODE] = []
7337 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7339 def DeclareLocks(self, level):
7340 if level == locking.LEVEL_NODE:
7341 self._LockInstancesNodes()
7343 def CheckPrereq(self):
7344 """Check prerequisites.
7346 This only checks the optional instance list against the existing names.
7349 if self.wanted_names is None:
7350 self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
7352 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
7353 in self.wanted_names]
7356 def _ComputeBlockdevStatus(self, node, instance_name, dev):
7357 """Returns the status of a block device
7360 if self.op.static or not node:
7363 self.cfg.SetDiskID(dev, node)
7365 result = self.rpc.call_blockdev_find(node, dev)
7369 result.Raise("Can't compute disk status for %s" % instance_name)
7371 status = result.payload
7375 return (status.dev_path, status.major, status.minor,
7376 status.sync_percent, status.estimated_time,
7377 status.is_degraded, status.ldisk_status)
7379 def _ComputeDiskStatus(self, instance, snode, dev):
7380 """Compute block device status.
7383 if dev.dev_type in constants.LDS_DRBD:
7384 # we change the snode then (otherwise we use the one passed in)
7385 if dev.logical_id[0] == instance.primary_node:
7386 snode = dev.logical_id[1]
7388 snode = dev.logical_id[0]
7390 dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
7392 dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
7395 dev_children = [self._ComputeDiskStatus(instance, snode, child)
7396 for child in dev.children]
7401 "iv_name": dev.iv_name,
7402 "dev_type": dev.dev_type,
7403 "logical_id": dev.logical_id,
7404 "physical_id": dev.physical_id,
7405 "pstatus": dev_pstatus,
7406 "sstatus": dev_sstatus,
7407 "children": dev_children,
7414 def Exec(self, feedback_fn):
7415 """Gather and return data"""
7418 cluster = self.cfg.GetClusterInfo()
7420 for instance in self.wanted_instances:
7421 if not self.op.static:
7422 remote_info = self.rpc.call_instance_info(instance.primary_node,
7424 instance.hypervisor)
7425 remote_info.Raise("Error checking node %s" % instance.primary_node)
7426 remote_info = remote_info.payload
7427 if remote_info and "state" in remote_info:
7430 remote_state = "down"
7433 if instance.admin_up:
7436 config_state = "down"
7438 disks = [self._ComputeDiskStatus(instance, None, device)
7439 for device in instance.disks]
7442 "name": instance.name,
7443 "config_state": config_state,
7444 "run_state": remote_state,
7445 "pnode": instance.primary_node,
7446 "snodes": instance.secondary_nodes,
7448 # this happens to be the same format used for hooks
7449 "nics": _NICListToTuple(self, instance.nics),
7451 "hypervisor": instance.hypervisor,
7452 "network_port": instance.network_port,
7453 "hv_instance": instance.hvparams,
7454 "hv_actual": cluster.FillHV(instance, skip_globals=True),
7455 "be_instance": instance.beparams,
7456 "be_actual": cluster.FillBE(instance),
7457 "serial_no": instance.serial_no,
7458 "mtime": instance.mtime,
7459 "ctime": instance.ctime,
7460 "uuid": instance.uuid,
7463 result[instance.name] = idict
7468 class LUSetInstanceParams(LogicalUnit):
7469 """Modifies an instances's parameters.
7472 HPATH = "instance-modify"
7473 HTYPE = constants.HTYPE_INSTANCE
7474 _OP_REQP = ["instance_name"]
7477 def CheckArguments(self):
7478 if not hasattr(self.op, 'nics'):
7480 if not hasattr(self.op, 'disks'):
7482 if not hasattr(self.op, 'beparams'):
7483 self.op.beparams = {}
7484 if not hasattr(self.op, 'hvparams'):
7485 self.op.hvparams = {}
7486 self.op.force = getattr(self.op, "force", False)
7487 if not (self.op.nics or self.op.disks or
7488 self.op.hvparams or self.op.beparams):
7489 raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
7491 if self.op.hvparams:
7492 _CheckGlobalHvParams(self.op.hvparams)
7496 for disk_op, disk_dict in self.op.disks:
7497 if disk_op == constants.DDM_REMOVE:
7500 elif disk_op == constants.DDM_ADD:
7503 if not isinstance(disk_op, int):
7504 raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL)
7505 if not isinstance(disk_dict, dict):
7506 msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
7507 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7509 if disk_op == constants.DDM_ADD:
7510 mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
7511 if mode not in constants.DISK_ACCESS_SET:
7512 raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
7514 size = disk_dict.get('size', None)
7516 raise errors.OpPrereqError("Required disk parameter size missing",
7520 except (TypeError, ValueError), err:
7521 raise errors.OpPrereqError("Invalid disk size parameter: %s" %
7522 str(err), errors.ECODE_INVAL)
7523 disk_dict['size'] = size
7525 # modification of disk
7526 if 'size' in disk_dict:
7527 raise errors.OpPrereqError("Disk size change not possible, use"
7528 " grow-disk", errors.ECODE_INVAL)
7530 if disk_addremove > 1:
7531 raise errors.OpPrereqError("Only one disk add or remove operation"
7532 " supported at a time", errors.ECODE_INVAL)
7536 for nic_op, nic_dict in self.op.nics:
7537 if nic_op == constants.DDM_REMOVE:
7540 elif nic_op == constants.DDM_ADD:
7543 if not isinstance(nic_op, int):
7544 raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL)
7545 if not isinstance(nic_dict, dict):
7546 msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
7547 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
7549 # nic_dict should be a dict
7550 nic_ip = nic_dict.get('ip', None)
7551 if nic_ip is not None:
7552 if nic_ip.lower() == constants.VALUE_NONE:
7553 nic_dict['ip'] = None
7555 if not utils.IsValidIP(nic_ip):
7556 raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
7559 nic_bridge = nic_dict.get('bridge', None)
7560 nic_link = nic_dict.get('link', None)
7561 if nic_bridge and nic_link:
7562 raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
7563 " at the same time", errors.ECODE_INVAL)
7564 elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
7565 nic_dict['bridge'] = None
7566 elif nic_link and nic_link.lower() == constants.VALUE_NONE:
7567 nic_dict['link'] = None
7569 if nic_op == constants.DDM_ADD:
7570 nic_mac = nic_dict.get('mac', None)
7572 nic_dict['mac'] = constants.VALUE_AUTO
7574 if 'mac' in nic_dict:
7575 nic_mac = nic_dict['mac']
7576 if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7577 nic_mac = utils.NormalizeAndValidateMac(nic_mac)
7579 if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
7580 raise errors.OpPrereqError("'auto' is not a valid MAC address when"
7581 " modifying an existing nic",
7584 if nic_addremove > 1:
7585 raise errors.OpPrereqError("Only one NIC add or remove operation"
7586 " supported at a time", errors.ECODE_INVAL)
7588 def ExpandNames(self):
7589 self._ExpandAndLockInstance()
7590 self.needed_locks[locking.LEVEL_NODE] = []
7591 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
7593 def DeclareLocks(self, level):
7594 if level == locking.LEVEL_NODE:
7595 self._LockInstancesNodes()
7597 def BuildHooksEnv(self):
7600 This runs on the master, primary and secondaries.
7604 if constants.BE_MEMORY in self.be_new:
7605 args['memory'] = self.be_new[constants.BE_MEMORY]
7606 if constants.BE_VCPUS in self.be_new:
7607 args['vcpus'] = self.be_new[constants.BE_VCPUS]
7608 # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
7609 # information at all.
7612 nic_override = dict(self.op.nics)
7613 c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
7614 for idx, nic in enumerate(self.instance.nics):
7615 if idx in nic_override:
7616 this_nic_override = nic_override[idx]
7618 this_nic_override = {}
7619 if 'ip' in this_nic_override:
7620 ip = this_nic_override['ip']
7623 if 'mac' in this_nic_override:
7624 mac = this_nic_override['mac']
7627 if idx in self.nic_pnew:
7628 nicparams = self.nic_pnew[idx]
7630 nicparams = objects.FillDict(c_nicparams, nic.nicparams)
7631 mode = nicparams[constants.NIC_MODE]
7632 link = nicparams[constants.NIC_LINK]
7633 args['nics'].append((ip, mac, mode, link))
7634 if constants.DDM_ADD in nic_override:
7635 ip = nic_override[constants.DDM_ADD].get('ip', None)
7636 mac = nic_override[constants.DDM_ADD]['mac']
7637 nicparams = self.nic_pnew[constants.DDM_ADD]
7638 mode = nicparams[constants.NIC_MODE]
7639 link = nicparams[constants.NIC_LINK]
7640 args['nics'].append((ip, mac, mode, link))
7641 elif constants.DDM_REMOVE in nic_override:
7642 del args['nics'][-1]
7644 env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
7645 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
7649 def _GetUpdatedParams(old_params, update_dict,
7650 default_values, parameter_types):
7651 """Return the new params dict for the given params.
7653 @type old_params: dict
7654 @param old_params: old parameters
7655 @type update_dict: dict
7656 @param update_dict: dict containing new parameter values,
7657 or constants.VALUE_DEFAULT to reset the
7658 parameter to its default value
7659 @type default_values: dict
7660 @param default_values: default values for the filled parameters
7661 @type parameter_types: dict
7662 @param parameter_types: dict mapping target dict keys to types
7663 in constants.ENFORCEABLE_TYPES
7664 @rtype: (dict, dict)
7665 @return: (new_parameters, filled_parameters)
7668 params_copy = copy.deepcopy(old_params)
7669 for key, val in update_dict.iteritems():
7670 if val == constants.VALUE_DEFAULT:
7672 del params_copy[key]
7676 params_copy[key] = val
7677 utils.ForceDictType(params_copy, parameter_types)
7678 params_filled = objects.FillDict(default_values, params_copy)
7679 return (params_copy, params_filled)
7681 def CheckPrereq(self):
7682 """Check prerequisites.
7684 This only checks the instance list against the existing names.
7687 self.force = self.op.force
7689 # checking the new params on the primary/secondary nodes
7691 instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
7692 cluster = self.cluster = self.cfg.GetClusterInfo()
7693 assert self.instance is not None, \
7694 "Cannot retrieve locked instance %s" % self.op.instance_name
7695 pnode = instance.primary_node
7696 nodelist = list(instance.all_nodes)
7698 # hvparams processing
7699 if self.op.hvparams:
7700 i_hvdict, hv_new = self._GetUpdatedParams(
7701 instance.hvparams, self.op.hvparams,
7702 cluster.hvparams[instance.hypervisor],
7703 constants.HVS_PARAMETER_TYPES)
7705 hypervisor.GetHypervisor(
7706 instance.hypervisor).CheckParameterSyntax(hv_new)
7707 _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
7708 self.hv_new = hv_new # the new actual values
7709 self.hv_inst = i_hvdict # the new dict (without defaults)
7711 self.hv_new = self.hv_inst = {}
7713 # beparams processing
7714 if self.op.beparams:
7715 i_bedict, be_new = self._GetUpdatedParams(
7716 instance.beparams, self.op.beparams,
7717 cluster.beparams[constants.PP_DEFAULT],
7718 constants.BES_PARAMETER_TYPES)
7719 self.be_new = be_new # the new actual values
7720 self.be_inst = i_bedict # the new dict (without defaults)
7722 self.be_new = self.be_inst = {}
7726 if constants.BE_MEMORY in self.op.beparams and not self.force:
7727 mem_check_list = [pnode]
7728 if be_new[constants.BE_AUTO_BALANCE]:
7729 # either we changed auto_balance to yes or it was from before
7730 mem_check_list.extend(instance.secondary_nodes)
7731 instance_info = self.rpc.call_instance_info(pnode, instance.name,
7732 instance.hypervisor)
7733 nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
7734 instance.hypervisor)
7735 pninfo = nodeinfo[pnode]
7736 msg = pninfo.fail_msg
7738 # Assume the primary node is unreachable and go ahead
7739 self.warn.append("Can't get info from primary node %s: %s" %
7741 elif not isinstance(pninfo.payload.get('memory_free', None), int):
7742 self.warn.append("Node data from primary node %s doesn't contain"
7743 " free memory information" % pnode)
7744 elif instance_info.fail_msg:
7745 self.warn.append("Can't get instance runtime information: %s" %
7746 instance_info.fail_msg)
7748 if instance_info.payload:
7749 current_mem = int(instance_info.payload['memory'])
7751 # Assume instance not running
7752 # (there is a slight race condition here, but it's not very probable,
7753 # and we have no other way to check)
7755 miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
7756 pninfo.payload['memory_free'])
7758 raise errors.OpPrereqError("This change will prevent the instance"
7759 " from starting, due to %d MB of memory"
7760 " missing on its primary node" % miss_mem,
7763 if be_new[constants.BE_AUTO_BALANCE]:
7764 for node, nres in nodeinfo.items():
7765 if node not in instance.secondary_nodes:
7769 self.warn.append("Can't get info from secondary node %s: %s" %
7771 elif not isinstance(nres.payload.get('memory_free', None), int):
7772 self.warn.append("Secondary node %s didn't return free"
7773 " memory information" % node)
7774 elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
7775 self.warn.append("Not enough memory to failover instance to"
7776 " secondary node %s" % node)
7781 for nic_op, nic_dict in self.op.nics:
7782 if nic_op == constants.DDM_REMOVE:
7783 if not instance.nics:
7784 raise errors.OpPrereqError("Instance has no NICs, cannot remove",
7787 if nic_op != constants.DDM_ADD:
7789 if not instance.nics:
7790 raise errors.OpPrereqError("Invalid NIC index %s, instance has"
7791 " no NICs" % nic_op,
7793 if nic_op < 0 or nic_op >= len(instance.nics):
7794 raise errors.OpPrereqError("Invalid NIC index %s, valid values"
7796 (nic_op, len(instance.nics) - 1),
7798 old_nic_params = instance.nics[nic_op].nicparams
7799 old_nic_ip = instance.nics[nic_op].ip
7804 update_params_dict = dict([(key, nic_dict[key])
7805 for key in constants.NICS_PARAMETERS
7806 if key in nic_dict])
7808 if 'bridge' in nic_dict:
7809 update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
7811 new_nic_params, new_filled_nic_params = \
7812 self._GetUpdatedParams(old_nic_params, update_params_dict,
7813 cluster.nicparams[constants.PP_DEFAULT],
7814 constants.NICS_PARAMETER_TYPES)
7815 objects.NIC.CheckParameterSyntax(new_filled_nic_params)
7816 self.nic_pinst[nic_op] = new_nic_params
7817 self.nic_pnew[nic_op] = new_filled_nic_params
7818 new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
7820 if new_nic_mode == constants.NIC_MODE_BRIDGED:
7821 nic_bridge = new_filled_nic_params[constants.NIC_LINK]
7822 msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
7824 msg = "Error checking bridges on node %s: %s" % (pnode, msg)
7826 self.warn.append(msg)
7828 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
7829 if new_nic_mode == constants.NIC_MODE_ROUTED:
7830 if 'ip' in nic_dict:
7831 nic_ip = nic_dict['ip']
7835 raise errors.OpPrereqError('Cannot set the nic ip to None'
7836 ' on a routed nic', errors.ECODE_INVAL)
7837 if 'mac' in nic_dict:
7838 nic_mac = nic_dict['mac']
7840 raise errors.OpPrereqError('Cannot set the nic mac to None',
7842 elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
7843 # otherwise generate the mac
7844 nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
7846 # or validate/reserve the current one
7848 self.cfg.ReserveMAC(nic_mac, self.proc.GetECId())
7849 except errors.ReservationError:
7850 raise errors.OpPrereqError("MAC address %s already in use"
7851 " in cluster" % nic_mac,
7852 errors.ECODE_NOTUNIQUE)
7855 if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
7856 raise errors.OpPrereqError("Disk operations not supported for"
7857 " diskless instances",
7859 for disk_op, _ in self.op.disks:
7860 if disk_op == constants.DDM_REMOVE:
7861 if len(instance.disks) == 1:
7862 raise errors.OpPrereqError("Cannot remove the last disk of"
7865 ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
7866 ins_l = ins_l[pnode]
7867 msg = ins_l.fail_msg
7869 raise errors.OpPrereqError("Can't contact node %s: %s" %
7870 (pnode, msg), errors.ECODE_ENVIRON)
7871 if instance.name in ins_l.payload:
7872 raise errors.OpPrereqError("Instance is running, can't remove"
7873 " disks.", errors.ECODE_STATE)
7875 if (disk_op == constants.DDM_ADD and
7876 len(instance.nics) >= constants.MAX_DISKS):
7877 raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
7878 " add more" % constants.MAX_DISKS,
7880 if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
7882 if disk_op < 0 or disk_op >= len(instance.disks):
7883 raise errors.OpPrereqError("Invalid disk index %s, valid values"
7885 (disk_op, len(instance.disks)),
7890 def Exec(self, feedback_fn):
7891 """Modifies an instance.
7893 All parameters take effect only at the next restart of the instance.
7896 # Process here the warnings from CheckPrereq, as we don't have a
7897 # feedback_fn there.
7898 for warn in self.warn:
7899 feedback_fn("WARNING: %s" % warn)
7902 instance = self.instance
7904 for disk_op, disk_dict in self.op.disks:
7905 if disk_op == constants.DDM_REMOVE:
7906 # remove the last disk
7907 device = instance.disks.pop()
7908 device_idx = len(instance.disks)
7909 for node, disk in device.ComputeNodeTree(instance.primary_node):
7910 self.cfg.SetDiskID(disk, node)
7911 msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
7913 self.LogWarning("Could not remove disk/%d on node %s: %s,"
7914 " continuing anyway", device_idx, node, msg)
7915 result.append(("disk/%d" % device_idx, "remove"))
7916 elif disk_op == constants.DDM_ADD:
7918 if instance.disk_template == constants.DT_FILE:
7919 file_driver, file_path = instance.disks[0].logical_id
7920 file_path = os.path.dirname(file_path)
7922 file_driver = file_path = None
7923 disk_idx_base = len(instance.disks)
7924 new_disk = _GenerateDiskTemplate(self,
7925 instance.disk_template,
7926 instance.name, instance.primary_node,
7927 instance.secondary_nodes,
7932 instance.disks.append(new_disk)
7933 info = _GetInstanceInfoText(instance)
7935 logging.info("Creating volume %s for instance %s",
7936 new_disk.iv_name, instance.name)
7937 # Note: this needs to be kept in sync with _CreateDisks
7939 for node in instance.all_nodes:
7940 f_create = node == instance.primary_node
7942 _CreateBlockDev(self, node, instance, new_disk,
7943 f_create, info, f_create)
7944 except errors.OpExecError, err:
7945 self.LogWarning("Failed to create volume %s (%s) on"
7947 new_disk.iv_name, new_disk, node, err)
7948 result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
7949 (new_disk.size, new_disk.mode)))
7951 # change a given disk
7952 instance.disks[disk_op].mode = disk_dict['mode']
7953 result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
7955 for nic_op, nic_dict in self.op.nics:
7956 if nic_op == constants.DDM_REMOVE:
7957 # remove the last nic
7958 del instance.nics[-1]
7959 result.append(("nic.%d" % len(instance.nics), "remove"))
7960 elif nic_op == constants.DDM_ADD:
7961 # mac and bridge should be set, by now
7962 mac = nic_dict['mac']
7963 ip = nic_dict.get('ip', None)
7964 nicparams = self.nic_pinst[constants.DDM_ADD]
7965 new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
7966 instance.nics.append(new_nic)
7967 result.append(("nic.%d" % (len(instance.nics) - 1),
7968 "add:mac=%s,ip=%s,mode=%s,link=%s" %
7969 (new_nic.mac, new_nic.ip,
7970 self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
7971 self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
7974 for key in 'mac', 'ip':
7976 setattr(instance.nics[nic_op], key, nic_dict[key])
7977 if nic_op in self.nic_pinst:
7978 instance.nics[nic_op].nicparams = self.nic_pinst[nic_op]
7979 for key, val in nic_dict.iteritems():
7980 result.append(("nic.%s/%d" % (key, nic_op), val))
7983 if self.op.hvparams:
7984 instance.hvparams = self.hv_inst
7985 for key, val in self.op.hvparams.iteritems():
7986 result.append(("hv/%s" % key, val))
7989 if self.op.beparams:
7990 instance.beparams = self.be_inst
7991 for key, val in self.op.beparams.iteritems():
7992 result.append(("be/%s" % key, val))
7994 self.cfg.Update(instance, feedback_fn)
7999 class LUQueryExports(NoHooksLU):
8000 """Query the exports list
8003 _OP_REQP = ['nodes']
8006 def ExpandNames(self):
8007 self.needed_locks = {}
8008 self.share_locks[locking.LEVEL_NODE] = 1
8009 if not self.op.nodes:
8010 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8012 self.needed_locks[locking.LEVEL_NODE] = \
8013 _GetWantedNodes(self, self.op.nodes)
8015 def CheckPrereq(self):
8016 """Check prerequisites.
8019 self.nodes = self.acquired_locks[locking.LEVEL_NODE]
8021 def Exec(self, feedback_fn):
8022 """Compute the list of all the exported system images.
8025 @return: a dictionary with the structure node->(export-list)
8026 where export-list is a list of the instances exported on
8030 rpcresult = self.rpc.call_export_list(self.nodes)
8032 for node in rpcresult:
8033 if rpcresult[node].fail_msg:
8034 result[node] = False
8036 result[node] = rpcresult[node].payload
8041 class LUExportInstance(LogicalUnit):
8042 """Export an instance to an image in the cluster.
8045 HPATH = "instance-export"
8046 HTYPE = constants.HTYPE_INSTANCE
8047 _OP_REQP = ["instance_name", "target_node", "shutdown"]
8050 def CheckArguments(self):
8051 """Check the arguments.
8054 self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
8055 constants.DEFAULT_SHUTDOWN_TIMEOUT)
8057 def ExpandNames(self):
8058 self._ExpandAndLockInstance()
8059 # FIXME: lock only instance primary and destination node
8061 # Sad but true, for now we have do lock all nodes, as we don't know where
8062 # the previous export might be, and and in this LU we search for it and
8063 # remove it from its current node. In the future we could fix this by:
8064 # - making a tasklet to search (share-lock all), then create the new one,
8065 # then one to remove, after
8066 # - removing the removal operation altogether
8067 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8069 def DeclareLocks(self, level):
8070 """Last minute lock declaration."""
8071 # All nodes are locked anyway, so nothing to do here.
8073 def BuildHooksEnv(self):
8076 This will run on the master, primary node and target node.
8080 "EXPORT_NODE": self.op.target_node,
8081 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
8082 "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
8084 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
8085 nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
8086 self.op.target_node]
8089 def CheckPrereq(self):
8090 """Check prerequisites.
8092 This checks that the instance and node names are valid.
8095 instance_name = self.op.instance_name
8096 self.instance = self.cfg.GetInstanceInfo(instance_name)
8097 assert self.instance is not None, \
8098 "Cannot retrieve locked instance %s" % self.op.instance_name
8099 _CheckNodeOnline(self, self.instance.primary_node)
8101 self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
8102 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
8103 assert self.dst_node is not None
8105 _CheckNodeOnline(self, self.dst_node.name)
8106 _CheckNodeNotDrained(self, self.dst_node.name)
8108 # instance disk type verification
8109 for disk in self.instance.disks:
8110 if disk.dev_type == constants.LD_FILE:
8111 raise errors.OpPrereqError("Export not supported for instances with"
8112 " file-based disks", errors.ECODE_INVAL)
8114 def Exec(self, feedback_fn):
8115 """Export an instance to an image in the cluster.
8118 instance = self.instance
8119 dst_node = self.dst_node
8120 src_node = instance.primary_node
8122 if self.op.shutdown:
8123 # shutdown the instance, but not the disks
8124 feedback_fn("Shutting down instance %s" % instance.name)
8125 result = self.rpc.call_instance_shutdown(src_node, instance,
8126 self.shutdown_timeout)
8127 result.Raise("Could not shutdown instance %s on"
8128 " node %s" % (instance.name, src_node))
8130 vgname = self.cfg.GetVGName()
8134 # set the disks ID correctly since call_instance_start needs the
8135 # correct drbd minor to create the symlinks
8136 for disk in instance.disks:
8137 self.cfg.SetDiskID(disk, src_node)
8139 activate_disks = (not instance.admin_up)
8142 # Activate the instance disks if we'exporting a stopped instance
8143 feedback_fn("Activating disks for %s" % instance.name)
8144 _StartInstanceDisks(self, instance, None)
8150 for idx, disk in enumerate(instance.disks):
8151 feedback_fn("Creating a snapshot of disk/%s on node %s" %
8154 # result.payload will be a snapshot of an lvm leaf of the one we
8156 result = self.rpc.call_blockdev_snapshot(src_node, disk)
8157 msg = result.fail_msg
8159 self.LogWarning("Could not snapshot disk/%s on node %s: %s",
8161 snap_disks.append(False)
8163 disk_id = (vgname, result.payload)
8164 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
8165 logical_id=disk_id, physical_id=disk_id,
8166 iv_name=disk.iv_name)
8167 snap_disks.append(new_dev)
8170 if self.op.shutdown and instance.admin_up:
8171 feedback_fn("Starting instance %s" % instance.name)
8172 result = self.rpc.call_instance_start(src_node, instance, None, None)
8173 msg = result.fail_msg
8175 _ShutdownInstanceDisks(self, instance)
8176 raise errors.OpExecError("Could not start instance: %s" % msg)
8178 # TODO: check for size
8180 cluster_name = self.cfg.GetClusterName()
8181 for idx, dev in enumerate(snap_disks):
8182 feedback_fn("Exporting snapshot %s from %s to %s" %
8183 (idx, src_node, dst_node.name))
8185 # FIXME: pass debug from opcode to backend
8186 result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
8187 instance, cluster_name,
8188 idx, self.op.debug_level)
8189 msg = result.fail_msg
8191 self.LogWarning("Could not export disk/%s from node %s to"
8192 " node %s: %s", idx, src_node, dst_node.name, msg)
8193 dresults.append(False)
8195 dresults.append(True)
8196 msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
8198 self.LogWarning("Could not remove snapshot for disk/%d from node"
8199 " %s: %s", idx, src_node, msg)
8201 dresults.append(False)
8203 feedback_fn("Finalizing export on %s" % dst_node.name)
8204 result = self.rpc.call_finalize_export(dst_node.name, instance,
8207 msg = result.fail_msg
8209 self.LogWarning("Could not finalize export for instance %s"
8210 " on node %s: %s", instance.name, dst_node.name, msg)
8215 feedback_fn("Deactivating disks for %s" % instance.name)
8216 _ShutdownInstanceDisks(self, instance)
8218 nodelist = self.cfg.GetNodeList()
8219 nodelist.remove(dst_node.name)
8221 # on one-node clusters nodelist will be empty after the removal
8222 # if we proceed the backup would be removed because OpQueryExports
8223 # substitutes an empty list with the full cluster node list.
8224 iname = instance.name
8226 feedback_fn("Removing old exports for instance %s" % iname)
8227 exportlist = self.rpc.call_export_list(nodelist)
8228 for node in exportlist:
8229 if exportlist[node].fail_msg:
8231 if iname in exportlist[node].payload:
8232 msg = self.rpc.call_export_remove(node, iname).fail_msg
8234 self.LogWarning("Could not remove older export for instance %s"
8235 " on node %s: %s", iname, node, msg)
8236 return fin_resu, dresults
8239 class LURemoveExport(NoHooksLU):
8240 """Remove exports related to the named instance.
8243 _OP_REQP = ["instance_name"]
8246 def ExpandNames(self):
8247 self.needed_locks = {}
8248 # We need all nodes to be locked in order for RemoveExport to work, but we
8249 # don't need to lock the instance itself, as nothing will happen to it (and
8250 # we can remove exports also for a removed instance)
8251 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
8253 def CheckPrereq(self):
8254 """Check prerequisites.
8258 def Exec(self, feedback_fn):
8259 """Remove any export.
8262 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
8263 # If the instance was not found we'll try with the name that was passed in.
8264 # This will only work if it was an FQDN, though.
8266 if not instance_name:
8268 instance_name = self.op.instance_name
8270 locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
8271 exportlist = self.rpc.call_export_list(locked_nodes)
8273 for node in exportlist:
8274 msg = exportlist[node].fail_msg
8276 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
8278 if instance_name in exportlist[node].payload:
8280 result = self.rpc.call_export_remove(node, instance_name)
8281 msg = result.fail_msg
8283 logging.error("Could not remove export for instance %s"
8284 " on node %s: %s", instance_name, node, msg)
8286 if fqdn_warn and not found:
8287 feedback_fn("Export not found. If trying to remove an export belonging"
8288 " to a deleted instance please use its Fully Qualified"
8292 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
8295 This is an abstract class which is the parent of all the other tags LUs.
8299 def ExpandNames(self):
8300 self.needed_locks = {}
8301 if self.op.kind == constants.TAG_NODE:
8302 self.op.name = _ExpandNodeName(self.cfg, self.op.name)
8303 self.needed_locks[locking.LEVEL_NODE] = self.op.name
8304 elif self.op.kind == constants.TAG_INSTANCE:
8305 self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
8306 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
8308 def CheckPrereq(self):
8309 """Check prerequisites.
8312 if self.op.kind == constants.TAG_CLUSTER:
8313 self.target = self.cfg.GetClusterInfo()
8314 elif self.op.kind == constants.TAG_NODE:
8315 self.target = self.cfg.GetNodeInfo(self.op.name)
8316 elif self.op.kind == constants.TAG_INSTANCE:
8317 self.target = self.cfg.GetInstanceInfo(self.op.name)
8319 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
8320 str(self.op.kind), errors.ECODE_INVAL)
8323 class LUGetTags(TagsLU):
8324 """Returns the tags of a given object.
8327 _OP_REQP = ["kind", "name"]
8330 def Exec(self, feedback_fn):
8331 """Returns the tag list.
8334 return list(self.target.GetTags())
8337 class LUSearchTags(NoHooksLU):
8338 """Searches the tags for a given pattern.
8341 _OP_REQP = ["pattern"]
8344 def ExpandNames(self):
8345 self.needed_locks = {}
8347 def CheckPrereq(self):
8348 """Check prerequisites.
8350 This checks the pattern passed for validity by compiling it.
8354 self.re = re.compile(self.op.pattern)
8355 except re.error, err:
8356 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
8357 (self.op.pattern, err), errors.ECODE_INVAL)
8359 def Exec(self, feedback_fn):
8360 """Returns the tag list.
8364 tgts = [("/cluster", cfg.GetClusterInfo())]
8365 ilist = cfg.GetAllInstancesInfo().values()
8366 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
8367 nlist = cfg.GetAllNodesInfo().values()
8368 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
8370 for path, target in tgts:
8371 for tag in target.GetTags():
8372 if self.re.search(tag):
8373 results.append((path, tag))
8377 class LUAddTags(TagsLU):
8378 """Sets a tag on a given object.
8381 _OP_REQP = ["kind", "name", "tags"]
8384 def CheckPrereq(self):
8385 """Check prerequisites.
8387 This checks the type and length of the tag name and value.
8390 TagsLU.CheckPrereq(self)
8391 for tag in self.op.tags:
8392 objects.TaggableObject.ValidateTag(tag)
8394 def Exec(self, feedback_fn):
8399 for tag in self.op.tags:
8400 self.target.AddTag(tag)
8401 except errors.TagError, err:
8402 raise errors.OpExecError("Error while setting tag: %s" % str(err))
8403 self.cfg.Update(self.target, feedback_fn)
8406 class LUDelTags(TagsLU):
8407 """Delete a list of tags from a given object.
8410 _OP_REQP = ["kind", "name", "tags"]
8413 def CheckPrereq(self):
8414 """Check prerequisites.
8416 This checks that we have the given tag.
8419 TagsLU.CheckPrereq(self)
8420 for tag in self.op.tags:
8421 objects.TaggableObject.ValidateTag(tag)
8422 del_tags = frozenset(self.op.tags)
8423 cur_tags = self.target.GetTags()
8424 if not del_tags <= cur_tags:
8425 diff_tags = del_tags - cur_tags
8426 diff_names = ["'%s'" % tag for tag in diff_tags]
8428 raise errors.OpPrereqError("Tag(s) %s not found" %
8429 (",".join(diff_names)), errors.ECODE_NOENT)
8431 def Exec(self, feedback_fn):
8432 """Remove the tag from the object.
8435 for tag in self.op.tags:
8436 self.target.RemoveTag(tag)
8437 self.cfg.Update(self.target, feedback_fn)
8440 class LUTestDelay(NoHooksLU):
8441 """Sleep for a specified amount of time.
8443 This LU sleeps on the master and/or nodes for a specified amount of
8447 _OP_REQP = ["duration", "on_master", "on_nodes"]
8450 def ExpandNames(self):
8451 """Expand names and set required locks.
8453 This expands the node list, if any.
8456 self.needed_locks = {}
8457 if self.op.on_nodes:
8458 # _GetWantedNodes can be used here, but is not always appropriate to use
8459 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
8461 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
8462 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
8464 def CheckPrereq(self):
8465 """Check prerequisites.
8469 def Exec(self, feedback_fn):
8470 """Do the actual sleep.
8473 if self.op.on_master:
8474 if not utils.TestDelay(self.op.duration):
8475 raise errors.OpExecError("Error during master delay test")
8476 if self.op.on_nodes:
8477 result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
8478 for node, node_result in result.items():
8479 node_result.Raise("Failure during rpc call to node %s" % node)
8482 class IAllocator(object):
8483 """IAllocator framework.
8485 An IAllocator instance has three sets of attributes:
8486 - cfg that is needed to query the cluster
8487 - input data (all members of the _KEYS class attribute are required)
8488 - four buffer attributes (in|out_data|text), that represent the
8489 input (to the external script) in text and data structure format,
8490 and the output from it, again in two formats
8491 - the result variables from the script (success, info, nodes) for
8495 # pylint: disable-msg=R0902
8496 # lots of instance attributes
8498 "name", "mem_size", "disks", "disk_template",
8499 "os", "tags", "nics", "vcpus", "hypervisor",
8502 "name", "relocate_from",
8508 def __init__(self, cfg, rpc, mode, **kwargs):
8511 # init buffer variables
8512 self.in_text = self.out_text = self.in_data = self.out_data = None
8513 # init all input fields so that pylint is happy
8515 self.mem_size = self.disks = self.disk_template = None
8516 self.os = self.tags = self.nics = self.vcpus = None
8517 self.hypervisor = None
8518 self.relocate_from = None
8520 self.evac_nodes = None
8522 self.required_nodes = None
8523 # init result fields
8524 self.success = self.info = self.result = None
8525 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8526 keyset = self._ALLO_KEYS
8527 fn = self._AddNewInstance
8528 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8529 keyset = self._RELO_KEYS
8530 fn = self._AddRelocateInstance
8531 elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
8532 keyset = self._EVAC_KEYS
8533 fn = self._AddEvacuateNodes
8535 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
8536 " IAllocator" % self.mode)
8538 if key not in keyset:
8539 raise errors.ProgrammerError("Invalid input parameter '%s' to"
8540 " IAllocator" % key)
8541 setattr(self, key, kwargs[key])
8544 if key not in kwargs:
8545 raise errors.ProgrammerError("Missing input parameter '%s' to"
8546 " IAllocator" % key)
8547 self._BuildInputData(fn)
8549 def _ComputeClusterData(self):
8550 """Compute the generic allocator input data.
8552 This is the data that is independent of the actual operation.
8556 cluster_info = cfg.GetClusterInfo()
8559 "version": constants.IALLOCATOR_VERSION,
8560 "cluster_name": cfg.GetClusterName(),
8561 "cluster_tags": list(cluster_info.GetTags()),
8562 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
8563 # we don't have job IDs
8565 iinfo = cfg.GetAllInstancesInfo().values()
8566 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
8570 node_list = cfg.GetNodeList()
8572 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
8573 hypervisor_name = self.hypervisor
8574 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
8575 hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
8576 elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
8577 hypervisor_name = cluster_info.enabled_hypervisors[0]
8579 node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
8582 self.rpc.call_all_instances_info(node_list,
8583 cluster_info.enabled_hypervisors)
8584 for nname, nresult in node_data.items():
8585 # first fill in static (config-based) values
8586 ninfo = cfg.GetNodeInfo(nname)
8588 "tags": list(ninfo.GetTags()),
8589 "primary_ip": ninfo.primary_ip,
8590 "secondary_ip": ninfo.secondary_ip,
8591 "offline": ninfo.offline,
8592 "drained": ninfo.drained,
8593 "master_candidate": ninfo.master_candidate,
8596 if not (ninfo.offline or ninfo.drained):
8597 nresult.Raise("Can't get data for node %s" % nname)
8598 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
8600 remote_info = nresult.payload
8602 for attr in ['memory_total', 'memory_free', 'memory_dom0',
8603 'vg_size', 'vg_free', 'cpu_total']:
8604 if attr not in remote_info:
8605 raise errors.OpExecError("Node '%s' didn't return attribute"
8606 " '%s'" % (nname, attr))
8607 if not isinstance(remote_info[attr], int):
8608 raise errors.OpExecError("Node '%s' returned invalid value"
8610 (nname, attr, remote_info[attr]))
8611 # compute memory used by primary instances
8612 i_p_mem = i_p_up_mem = 0
8613 for iinfo, beinfo in i_list:
8614 if iinfo.primary_node == nname:
8615 i_p_mem += beinfo[constants.BE_MEMORY]
8616 if iinfo.name not in node_iinfo[nname].payload:
8619 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
8620 i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
8621 remote_info['memory_free'] -= max(0, i_mem_diff)
8624 i_p_up_mem += beinfo[constants.BE_MEMORY]
8626 # compute memory used by instances
8628 "total_memory": remote_info['memory_total'],
8629 "reserved_memory": remote_info['memory_dom0'],
8630 "free_memory": remote_info['memory_free'],
8631 "total_disk": remote_info['vg_size'],
8632 "free_disk": remote_info['vg_free'],
8633 "total_cpus": remote_info['cpu_total'],
8634 "i_pri_memory": i_p_mem,
8635 "i_pri_up_memory": i_p_up_mem,
8639 node_results[nname] = pnr
8640 data["nodes"] = node_results
8644 for iinfo, beinfo in i_list:
8646 for nic in iinfo.nics:
8647 filled_params = objects.FillDict(
8648 cluster_info.nicparams[constants.PP_DEFAULT],
8650 nic_dict = {"mac": nic.mac,
8652 "mode": filled_params[constants.NIC_MODE],
8653 "link": filled_params[constants.NIC_LINK],
8655 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
8656 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
8657 nic_data.append(nic_dict)
8659 "tags": list(iinfo.GetTags()),
8660 "admin_up": iinfo.admin_up,
8661 "vcpus": beinfo[constants.BE_VCPUS],
8662 "memory": beinfo[constants.BE_MEMORY],
8664 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
8666 "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
8667 "disk_template": iinfo.disk_template,
8668 "hypervisor": iinfo.hypervisor,
8670 pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
8672 instance_data[iinfo.name] = pir
8674 data["instances"] = instance_data
8678 def _AddNewInstance(self):
8679 """Add new instance data to allocator structure.
8681 This in combination with _AllocatorGetClusterData will create the
8682 correct structure needed as input for the allocator.
8684 The checks for the completeness of the opcode must have already been
8688 disk_space = _ComputeDiskSize(self.disk_template, self.disks)
8690 if self.disk_template in constants.DTS_NET_MIRROR:
8691 self.required_nodes = 2
8693 self.required_nodes = 1
8696 "disk_template": self.disk_template,
8699 "vcpus": self.vcpus,
8700 "memory": self.mem_size,
8701 "disks": self.disks,
8702 "disk_space_total": disk_space,
8704 "required_nodes": self.required_nodes,
8708 def _AddRelocateInstance(self):
8709 """Add relocate instance data to allocator structure.
8711 This in combination with _IAllocatorGetClusterData will create the
8712 correct structure needed as input for the allocator.
8714 The checks for the completeness of the opcode must have already been
8718 instance = self.cfg.GetInstanceInfo(self.name)
8719 if instance is None:
8720 raise errors.ProgrammerError("Unknown instance '%s' passed to"
8721 " IAllocator" % self.name)
8723 if instance.disk_template not in constants.DTS_NET_MIRROR:
8724 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
8727 if len(instance.secondary_nodes) != 1:
8728 raise errors.OpPrereqError("Instance has not exactly one secondary node",
8731 self.required_nodes = 1
8732 disk_sizes = [{'size': disk.size} for disk in instance.disks]
8733 disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
8737 "disk_space_total": disk_space,
8738 "required_nodes": self.required_nodes,
8739 "relocate_from": self.relocate_from,
8743 def _AddEvacuateNodes(self):
8744 """Add evacuate nodes data to allocator structure.
8748 "evac_nodes": self.evac_nodes
8752 def _BuildInputData(self, fn):
8753 """Build input data structures.
8756 self._ComputeClusterData()
8759 request["type"] = self.mode
8760 self.in_data["request"] = request
8762 self.in_text = serializer.Dump(self.in_data)
8764 def Run(self, name, validate=True, call_fn=None):
8765 """Run an instance allocator and return the results.
8769 call_fn = self.rpc.call_iallocator_runner
8771 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
8772 result.Raise("Failure while running the iallocator script")
8774 self.out_text = result.payload
8776 self._ValidateResult()
8778 def _ValidateResult(self):
8779 """Process the allocator results.
8781 This will process and if successful save the result in
8782 self.out_data and the other parameters.
8786 rdict = serializer.Load(self.out_text)
8787 except Exception, err:
8788 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
8790 if not isinstance(rdict, dict):
8791 raise errors.OpExecError("Can't parse iallocator results: not a dict")
8793 # TODO: remove backwards compatiblity in later versions
8794 if "nodes" in rdict and "result" not in rdict:
8795 rdict["result"] = rdict["nodes"]
8798 for key in "success", "info", "result":
8799 if key not in rdict:
8800 raise errors.OpExecError("Can't parse iallocator results:"
8801 " missing key '%s'" % key)
8802 setattr(self, key, rdict[key])
8804 if not isinstance(rdict["result"], list):
8805 raise errors.OpExecError("Can't parse iallocator results: 'result' key"
8807 self.out_data = rdict
8810 class LUTestAllocator(NoHooksLU):
8811 """Run allocator tests.
8813 This LU runs the allocator tests
8816 _OP_REQP = ["direction", "mode", "name"]
8818 def CheckPrereq(self):
8819 """Check prerequisites.
8821 This checks the opcode parameters depending on the director and mode test.
8824 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8825 for attr in ["name", "mem_size", "disks", "disk_template",
8826 "os", "tags", "nics", "vcpus"]:
8827 if not hasattr(self.op, attr):
8828 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
8829 attr, errors.ECODE_INVAL)
8830 iname = self.cfg.ExpandInstanceName(self.op.name)
8831 if iname is not None:
8832 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
8833 iname, errors.ECODE_EXISTS)
8834 if not isinstance(self.op.nics, list):
8835 raise errors.OpPrereqError("Invalid parameter 'nics'",
8837 for row in self.op.nics:
8838 if (not isinstance(row, dict) or
8841 "bridge" not in row):
8842 raise errors.OpPrereqError("Invalid contents of the 'nics'"
8843 " parameter", errors.ECODE_INVAL)
8844 if not isinstance(self.op.disks, list):
8845 raise errors.OpPrereqError("Invalid parameter 'disks'",
8847 for row in self.op.disks:
8848 if (not isinstance(row, dict) or
8849 "size" not in row or
8850 not isinstance(row["size"], int) or
8851 "mode" not in row or
8852 row["mode"] not in ['r', 'w']):
8853 raise errors.OpPrereqError("Invalid contents of the 'disks'"
8854 " parameter", errors.ECODE_INVAL)
8855 if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
8856 self.op.hypervisor = self.cfg.GetHypervisorType()
8857 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8858 if not hasattr(self.op, "name"):
8859 raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
8861 fname = _ExpandInstanceName(self.cfg, self.op.name)
8862 self.op.name = fname
8863 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
8864 elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
8865 if not hasattr(self.op, "evac_nodes"):
8866 raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
8867 " opcode input", errors.ECODE_INVAL)
8869 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
8870 self.op.mode, errors.ECODE_INVAL)
8872 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
8873 if not hasattr(self.op, "allocator") or self.op.allocator is None:
8874 raise errors.OpPrereqError("Missing allocator name",
8876 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
8877 raise errors.OpPrereqError("Wrong allocator test '%s'" %
8878 self.op.direction, errors.ECODE_INVAL)
8880 def Exec(self, feedback_fn):
8881 """Run the allocator test.
8884 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
8885 ial = IAllocator(self.cfg, self.rpc,
8888 mem_size=self.op.mem_size,
8889 disks=self.op.disks,
8890 disk_template=self.op.disk_template,
8894 vcpus=self.op.vcpus,
8895 hypervisor=self.op.hypervisor,
8897 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
8898 ial = IAllocator(self.cfg, self.rpc,
8901 relocate_from=list(self.relocate_from),
8903 elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
8904 ial = IAllocator(self.cfg, self.rpc,
8906 evac_nodes=self.op.evac_nodes)
8908 raise errors.ProgrammerError("Uncatched mode %s in"
8909 " LUTestAllocator.Exec", self.op.mode)
8911 if self.op.direction == constants.IALLOCATOR_DIR_IN:
8912 result = ial.in_text
8914 ial.Run(self.op.allocator, validate=False)
8915 result = ial.out_text